• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 21579574864

02 Feb 2026 06:20AM UTC coverage: 65.58% (+0.08%) from 65.5%
21579574864

push

github

web-flow
fix: invalidate cache on tenant creation (#842)

Previously, if a client tried to connect before a tenant was created,
the `get_user_cache` function would cache the error. After the tenant
was created, connection attempts would still fail until the cache TTL.
Now the cache is cleared after inserts.

Additionally, cache and pool termination logic has been moved from the
controller to the domain module.

16 of 16 new or added lines in 1 file covered. (100.0%)

2 existing lines in 2 files now uncovered.

1932 of 2946 relevant lines covered (65.58%)

4216.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.68
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a connection to the database, send
4
  requests to the database, and handle incoming messages from clients.
5

6
  The state machine uses the Supavisor.Protocol.Server module to decode messages
7
  from the database and sends messages to the client socket it received on checkout.
8
  """
9

10
  @behaviour :gen_statem
11

12
  require Logger
13
  require Supavisor.Protocol.Server, as: Server
14
  require Supavisor.Protocol.MessageStreamer, as: MessageStreamer
15

16
  alias Supavisor.Protocol.PreparedStatements
17

18
  alias Supavisor.{
19
    ClientHandler,
20
    FeatureFlag,
21
    HandlerHelpers,
22
    Helpers,
23
    Monitoring.Telem,
24
    Protocol.BackendMessageHandler,
25
    Protocol.Debug,
26
    Protocol.MessageStreamer,
27
    Protocol.Server
28
  }
29

30
  @type state :: :connect | :authentication | :idle | :busy | :terminating_with_error
31

32
  @reconnect_timeout 2_500
33
  @reconnect_timeout_proxy 500
34
  @sock_closed [:tcp_closed, :ssl_closed]
35
  @proto [:tcp, :ssl]
36
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
37

38
  @doc """
39
  Starts a DbHandler state machine
40

41
  Accepts two different types of args:
42

43
  TODO: details about required arguments in each
44
  - proxied
45
  - not proxied
46
  """
47
  def start_link(config),
48
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
336✔
49

50
  @doc """
51
  Checks out a DbHandler process
52

53
  Requires a client socket. The DbHandler will forward messages directly to the
54
  client socket when possible.
55

56
  Returns the server socket, which the client may write messages directly to.
57
  """
58
  @spec checkout(pid(), Supavisor.sock(), pid(), timeout()) ::
59
          {:ok, Supavisor.sock()} | {:error, {:exit, term()}} | {:error, map()}
60
  def checkout(pid, sock, caller, timeout \\ 15_000) do
61
    :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
4,808✔
62
  catch
63
    :exit, reason ->
3✔
64
      {:error, {:exit, reason}}
65
  end
66

67
  @doc """
68
  Checks in a DbHandler process
69
  """
70
  @spec checkin(pid()) :: :ok
71
  def checkin(pid), do: :gen_statem.cast(pid, :checkin)
×
72

73
  @doc """
74
  Sends prepared statement packets to a DbHandler
75

76
  Different from most packets, prepared statements packets involve state at the DbHandler,
77
  and hence can't be sent directly to the database socket. Instead, they should be sent
78
  to the DbHandler through this function.
79
  """
80
  @spec handle_prepared_statement_pkts(pid, [PreparedStatements.handled_pkt()]) :: :ok
81
  def handle_prepared_statement_pkts(pid, pkts) do
82
    :gen_statem.call(pid, {:handle_ps_pkts, pkts}, 15_000)
2,884✔
83
  end
84

85
  @doc """
86
  Returns the state and the mode of the DbHandler
87
  """
88
  @spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
89
  def get_state_and_mode(pid) do
×
90
    {:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
91
  catch
92
    error, reason -> {:error, {error, reason}}
×
93
  end
94

95
  @doc """
96
  Stops a DbHandler
97
  """
98
  @spec stop(pid()) :: :ok
99
  def stop(pid) do
100
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
×
101
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
×
102
  end
103

104
  @impl true
105
  def init(args) do
106
    Process.flag(:trap_exit, true)
337✔
107

108
    {id, config} =
337✔
109
      case args do
110
        %{proxy: true} ->
×
111
          {args.id, args}
×
112

113
        %{} ->
114
          config = Supavisor.Manager.get_config(args.id)
337✔
115
          {args.id, config}
337✔
116
      end
117

118
    Helpers.set_log_level(config.log_level)
337✔
119
    Helpers.set_max_heap_size(90)
337✔
120

121
    {_, tenant} = config.tenant
337✔
122
    Logger.metadata(project: tenant, user: config.user, mode: config.mode)
337✔
123

124
    auth =
337✔
125
      if config[:proxy] do
126
        # Proxy mode: secrets already in config.auth from ClientHandler
127
        config.auth
×
128
      else
129
        # Pool mode: fetch secrets from TenantCache
130
        get_auth_with_secrets(config.auth, id)
337✔
131
      end
132

133
    data =
337✔
134
      %{
135
        id: id,
136
        sock: nil,
137
        auth: auth,
138
        user: config.user,
337✔
139
        tenant: config.tenant,
337✔
140
        tenant_feature_flags: config.tenant_feature_flags,
337✔
141
        db_state: nil,
142
        parameter_status: %{},
143
        nonce: nil,
144
        server_proof: nil,
145
        stats: %{},
146
        prepared_statements: MapSet.new(),
147
        proxy: Map.get(config, :proxy, false),
148
        stream_state: MessageStreamer.new_stream_state(BackendMessageHandler),
149
        mode: config.mode,
337✔
150
        replica_type: config.replica_type,
337✔
151
        caller: Map.get(config, :caller) || nil,
337✔
152
        client_sock: Map.get(config, :client_sock) || nil,
337✔
153
        reconnect_retries: 0,
154
        terminating_error: nil
155
      }
156

157
    Telem.handler_action(:db_handler, :started, id)
337✔
158
    {:ok, :connect, data, {:next_event, :internal, :connect}}
337✔
159
  end
160

161
  @impl true
162
  def callback_mode, do: [:handle_event_function]
336✔
163

164
  @impl true
165
  def handle_event(:internal, :connect, :connect, %{auth: auth} = data) do
166
    Logger.debug("DbHandler: Try to connect to DB")
340✔
167

168
    sock_opts = [
340✔
169
      auth.ip_version,
340✔
170
      mode: :binary,
171
      packet: :raw,
172
      nodelay: true,
173
      active: false
174
    ]
175

176
    Telem.handler_action(:db_handler, :db_connection, data.id)
340✔
177

178
    case :gen_tcp.connect(auth.host, auth.port, sock_opts) do
340✔
179
      {:ok, sock} ->
180
        # Ensure buffer >= recbuf to avoid unnecessary copying
181
        # Set once at connection time as best effort; OS may adjust recbuf later via auto-tuning.
182
        {:ok, [{:recbuf, recbuf}]} = :inet.getopts(sock, [:recbuf])
338✔
183
        :ok = :inet.setopts(sock, buffer: recbuf)
338✔
184

185
        Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}")
338✔
186

187
        case try_ssl_handshake({:gen_tcp, sock}, auth) do
338✔
188
          {:ok, sock} ->
189
            tenant = if data.mode == :proxy, do: Supavisor.tenant(data.id)
337✔
190
            search_path = Supavisor.search_path(data.id)
337✔
191

192
            case send_startup(sock, auth, tenant, search_path) do
337✔
193
              :ok ->
194
                :ok = activate(sock)
337✔
195
                {:next_state, :authentication, %{data | sock: sock}}
337✔
196

197
              {:error, reason} ->
198
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
×
199
                handle_connection_failure(reason, data)
×
200
            end
201

202
          {:error, reason} ->
203
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
1✔
204
            maybe_reconnect(reason, data)
1✔
205
        end
206

207
      other ->
208
        Logger.error(
2✔
209
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}"
2✔
210
        )
211

212
        handle_connection_failure(other, data)
2✔
213
    end
214
  end
215

216
  def handle_event(:internal, {:terminate_with_error, error, pool_action}, _state, data) do
217
    Logger.debug("DbHandler: Transitioning to terminating_with_error state")
4✔
218

219
    if pool_action == :shutdown_pool and not data.proxy do
4✔
220
      Supavisor.Manager.shutdown_with_error(data.id, error)
2✔
221
    end
222

223
    # If not checked out yet, the postponed checkout will handle sending the error
224
    if data.client_sock != nil do
4✔
225
      encode_and_forward_error(error, data)
1✔
226
    end
227

228
    # Use cast to allow postponed events to be processed first
229
    :gen_statem.cast(self(), :finalize_termination)
4✔
230

231
    # This state will handle postponed checkout calls by returning the error
232
    {:next_state, :terminating_with_error, %{data | terminating_error: error}}
4✔
233
  end
234

235
  def handle_event(:cast, :finalize_termination, :terminating_with_error, _data) do
236
    Logger.debug("DbHandler: Stopping from terminating_with_error state")
4✔
237
    {:stop, :normal}
238
  end
239

240
  def handle_event(:state_timeout, :connect, _state, data) do
241
    retry = data.reconnect_retries
×
242
    Logger.warning("DbHandler: Reconnect #{retry} to DB")
×
243

244
    {:keep_state, %{data | reconnect_retries: retry + 1}, {:next_event, :internal, :connect}}
×
245
  end
246

247
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
248
    {:ok, dec_pkt, _} = Server.decode(bin)
986✔
249
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
986✔
250

251
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
986✔
252

253
    case resp do
986✔
254
      {:authentication_sasl, nonce} ->
328✔
255
        {:keep_state, %{data | nonce: nonce}}
256

257
      {:authentication_server_first_message, server_proof} ->
327✔
258
        {:keep_state, %{data | server_proof: server_proof}}
259

260
      %{authentication_server_final_message: _server_final} ->
×
261
        :keep_state_and_data
262

263
      %{authentication_ok: true} ->
×
264
        :keep_state_and_data
265

266
      :authentication_md5 ->
2✔
267
        {:keep_state, data}
268

269
      :authentication_cleartext ->
1✔
270
        {:keep_state, data}
271

272
      {:error_response, %{"S" => "FATAL", "C" => "28P01"} = error} ->
273
        reason = error["M"] || "Authentication failed"
1✔
274
        handle_authentication_error(data, reason)
1✔
275
        Logger.error("DbHandler: Auth error #{inspect(error)}")
1✔
276

277
        {:keep_state_and_data,
278
         {:next_event, :internal, {:terminate_with_error, error, :keep_pool}}}
279

280
      {:error_response, %{"S" => "FATAL", "C" => "3D000"} = error} ->
281
        Logger.error("DbHandler: Database does not exist: #{inspect(error)}")
2✔
282

283
        {:keep_state_and_data,
284
         {:next_event, :internal, {:terminate_with_error, error, :shutdown_pool}}}
285

286
      {:error_response, %{"S" => "FATAL", "C" => "42501"} = error} ->
287
        Logger.error("DbHandler: Insufficient privilege: #{inspect(error)}")
×
288

289
        {:keep_state_and_data,
290
         {:next_event, :internal, {:terminate_with_error, error, :shutdown_pool}}}
291

292
      {:error_response, error} ->
293
        Logger.error("DbHandler: Error response during auth: #{inspect(error)}")
1✔
294

295
        {:keep_state_and_data,
296
         {:next_event, :internal, {:terminate_with_error, error, :keep_pool}}}
297

298
      {:ready_for_query, acc} ->
299
        ps = acc.ps
323✔
300

301
        Logger.debug(
323✔
302
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
323✔
303
        )
304

305
        if data.mode != :proxy do
323✔
306
          Supavisor.set_parameter_status(data.id, ps)
323✔
307
        end
308

309
        {:next_state, :idle, %{data | parameter_status: ps, reconnect_retries: 0}}
323✔
310

311
      other ->
312
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
1✔
313
        {:stop, :auth_error, data}
1✔
314
    end
315
  end
316

317
  # the process received message from db while idle
318
  def handle_event(:info, {proto, _, bin}, :idle, _data) when proto in @proto do
319
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when idle")
1✔
320
    :keep_state_and_data
321
  end
322

323
  # forward the message to the client
324
  def handle_event(:info, {proto, _, bin}, :busy, %{caller: caller} = data)
325
      when is_pid(caller) and proto in @proto do
326
    Logger.debug("DbHandler: Got messages: #{Debug.packet_to_string(bin, :backend)}")
10,087✔
327

328
    if String.ends_with?(bin, Server.ready_for_query()) do
10,087✔
329
      ClientHandler.db_status(data.caller, :ready_for_query)
6,363✔
330
      data = handle_server_messages(bin, data)
6,363✔
331

332
      case data.mode do
6,363✔
333
        :transaction ->
334
          {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
4,542✔
335
          {:next_state, :idle, %{data | stats: stats, caller: nil, client_sock: nil}}
4,542✔
336

337
        :proxy ->
×
338
          {:keep_state, data}
339

340
        :session ->
341
          {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
1,821✔
342
          {:keep_state, %{data | stats: stats}}
343
      end
344
    else
345
      data = handle_server_messages(bin, data)
3,724✔
346
      {:keep_state, data}
347
    end
348
  end
349

350
  def handle_event({:call, from}, {:handle_ps_pkts, pkts}, :busy, data) do
351
    {iodata, data} = Enum.reduce(pkts, {[], data}, &handle_prepared_statement_pkt/2)
2,884✔
352

353
    {close_pkts, prepared_statements} = evict_exceeding(data)
2,884✔
354

355
    :ok = HandlerHelpers.sock_send(data.sock, Enum.reverse([close_pkts | iodata]))
2,884✔
356

357
    data = %{
2,884✔
358
      data
359
      | stream_state:
360
          Enum.reduce(close_pkts, data.stream_state, fn _, stream_state ->
2,884✔
361
            MessageStreamer.update_state(stream_state, fn queue ->
640✔
362
              :queue.in({:intercept, :close}, queue)
640✔
363
            end)
364
          end),
365
        prepared_statements: prepared_statements
366
    }
367

368
    {:keep_state, data, {:reply, from, :ok}}
2,884✔
369
  end
370

371
  def handle_event({:call, from}, {:checkout, _sock, _caller}, :terminating_with_error, data) do
372
    Logger.debug("DbHandler: checkout call during terminating_with_error, replying with error")
×
373
    {:keep_state_and_data, {:reply, from, {:error, data.terminating_error}}}
×
374
  end
375

376
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
377
    Logger.debug("DbHandler: checkout call when state was #{state}: #{inspect(caller)}")
5,077✔
378

379
    if state in [:idle, :busy] do
5,077✔
380
      if data.mode == :proxy do
4,804✔
381
        bin_ps = Server.encode_parameter_status(data.parameter_status)
×
382
        send(caller, {:parameter_status, bin_ps})
×
383
      end
384

385
      {:next_state, :busy, %{data | client_sock: sock, caller: caller},
4,804✔
386
       {:reply, from, {:ok, data.sock}}}
4,804✔
387
    else
388
      {:keep_state_and_data, :postpone}
389
    end
390
  end
391

392
  def handle_event({:call, from}, :ps, :busy, data) do
393
    Logger.debug("DbHandler: get parameter status")
×
394
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
×
395
  end
396

397
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
398
    {:stop, {:shutdown, :db_termination}, data}
×
399
  end
400

401
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
402
    if state != :terminating_with_error do
1✔
403
      Logger.error("DbHandler: Db connection closed when state was #{state}")
1✔
404
    end
405

406
    if Application.get_env(:supavisor, :reconnect_on_db_close),
1✔
407
      do: {:next_state, :connect, data, {:state_timeout, reconnect_timeout(data), :connect}},
×
408
      else: {:stop, {:shutdown, :db_termination}, data}
1✔
409
  end
410

411
  # linked client_handler went down
412
  def handle_event(_, {:EXIT, pid, reason}, _state, data) do
413
    if reason != :normal do
262✔
UNCOV
414
      Logger.error(
×
415
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
416
      )
417
    end
418

419
    HandlerHelpers.sock_send(data.sock, Server.terminate_message())
262✔
420
    HandlerHelpers.sock_close(data.sock)
262✔
421
    {:stop, :normal}
422
  end
423

424
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
×
425
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
×
426
  end
427

428
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
429
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
76✔
430
    :keep_state_and_data
431
  end
432

433
  def handle_event(type, content, state, data) do
434
    msg = [
×
435
      {"type", type},
436
      {"content", content},
437
      {"state", state},
438
      {"data", data}
439
    ]
440

441
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
442

443
    :keep_state_and_data
444
  end
445

446
  @impl true
447
  def terminate(_reason, :terminating_with_error, data) do
448
    Telem.handler_action(:db_handler, :stopped, data.id)
2✔
449
  end
450

451
  def terminate(reason, state, data) do
452
    Telem.handler_action(:db_handler, :stopped, data.id)
310✔
453

454
    case reason do
310✔
455
      :normal ->
262✔
456
        :ok
457

458
      :shutdown ->
47✔
459
        :ok
460

461
      reason ->
462
        Logger.error(
1✔
463
          "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
464
        )
465
    end
466
  end
467

468
  @impl true
469
  def format_status(status) do
470
    Map.put(status, :queue, [])
×
471
  end
472

473
  @spec encode_and_forward_error(map(), map()) :: :ok | :noop
474
  defp encode_and_forward_error(message, data) do
475
    case data do
1✔
476
      %{client_sock: sock} when not is_nil(sock) ->
477
        HandlerHelpers.sock_send(
1✔
478
          sock,
479
          Server.encode_error_message(message)
480
        )
481

482
      _other ->
×
483
        :noop
484
    end
485
  end
486

487
  @spec try_ssl_handshake(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.sock()} | {:error, term()}
488
  defp try_ssl_handshake(sock, %{upstream_ssl: true} = auth) do
489
    case HandlerHelpers.sock_send(sock, Server.ssl_request()) do
1✔
490
      :ok -> ssl_recv(sock, auth)
1✔
491
      error -> error
×
492
    end
493
  end
494

495
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
337✔
496

497
  @spec ssl_recv(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.ssl_sock()} | {:error, term}
498
  defp ssl_recv({:gen_tcp, sock} = s, auth) do
499
    case :gen_tcp.recv(sock, 1, 15_000) do
1✔
500
      {:ok, <<?S>>} -> ssl_connect(s, auth)
×
501
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
1✔
502
      {:error, _} = error -> error
×
503
    end
504
  end
505

506
  @spec ssl_connect(Supavisor.tcp_sock(), map, pos_integer) ::
507
          {:ok, Supavisor.ssl_sock()} | {:error, term}
508
  defp ssl_connect({:gen_tcp, sock}, auth, timeout \\ 5000) do
509
    opts =
×
510
      case auth.upstream_verify do
×
511
        :peer ->
×
512
          [
513
            verify: :verify_peer,
514
            cacerts: [auth.upstream_tls_ca],
×
515
            # unclear behavior on pg14
516
            server_name_indication: auth.sni_hostname || auth.host,
×
517
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
518
          ]
519

520
        :none ->
×
521
          [verify: :verify_none]
522
      end
523

524
    case :ssl.connect(sock, opts, timeout) do
×
525
      {:ok, ssl_sock} ->
×
526
        {:ok, {:ssl, ssl_sock}}
527

528
      {:error, reason} ->
×
529
        {:error, reason}
530
    end
531
  end
532

533
  @spec send_startup(Supavisor.sock(), map(), String.t() | nil, String.t() | nil) ::
534
          :ok | {:error, term}
535
  def send_startup(sock, auth, tenant, search_path) do
536
    user =
337✔
537
      if is_nil(tenant), do: get_user(auth), else: "#{get_user(auth)}.#{tenant}"
337✔
538

539
    msg =
337✔
540
      :pgo_protocol.encode_startup_message(
541
        [
542
          {"user", user},
543
          {"database", auth.database},
337✔
544
          {"application_name", auth.application_name}
337✔
545
        ] ++ if(search_path, do: [{"options", "--search_path=#{search_path}"}], else: [])
337✔
546
      )
547

548
    HandlerHelpers.sock_send(sock, msg)
337✔
549
  end
550

551
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
552
  defp activate({:gen_tcp, sock}) do
553
    :inet.setopts(sock, active: @switch_active_count)
337✔
554
  end
555

556
  defp activate({:ssl, sock}) do
557
    :ssl.setopts(sock, active: @switch_active_count)
×
558
  end
559

560
  defp get_user(auth) do
561
    {_method, secrets_fn} = auth.secrets
664✔
562
    secrets_map = secrets_fn.()
664✔
563
    secrets_map.user
664✔
564
  end
565

566
  @spec handle_auth_pkts(map(), map(), map()) :: any()
567
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
568
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
4,199✔
569

570
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
323✔
571
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
572

573
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
574
    if data.mode != :proxy do
323✔
575
      Logger.metadata(backend_pid: payload[:pid])
323✔
576
    end
577

578
    key = self()
323✔
579
    conn = %{host: data.auth.host, port: data.auth.port, ip_version: data.auth.ip_version}
323✔
580
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
323✔
581
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
323✔
582
    Map.put(acc, :backend_key_data, payload)
323✔
583
  end
584

585
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
586
    nonce =
328✔
587
      case Server.decode_string(methods_b) do
588
        {:ok, req_method, _} ->
589
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
327✔
590
          nonce = :pgo_scram.get_nonce(16)
327✔
591
          user = get_user(data.auth)
327✔
592
          client_first = :pgo_scram.get_client_first(user, nonce)
327✔
593
          client_first_size = IO.iodata_length(client_first)
327✔
594

595
          sasl_initial_response = [
327✔
596
            "SCRAM-SHA-256",
597
            0,
598
            <<client_first_size::32-integer>>,
599
            client_first
600
          ]
601

602
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
327✔
603
          :ok = HandlerHelpers.sock_send(data.sock, bin)
327✔
604
          nonce
327✔
605

606
        other ->
607
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
1✔
608
          nil
609
      end
610

611
    {:authentication_sasl, nonce}
612
  end
613

614
  defp handle_auth_pkts(
615
         %{payload: {:authentication_server_first_message, server_first}},
616
         _,
617
         data
618
       ) do
619
    nonce = data.nonce
327✔
620
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
327✔
621

622
    {_method, secrets_fn} = data.auth.secrets
327✔
623
    secrets = secrets_fn.()
327✔
624

625
    {client_final_message, server_proof} =
327✔
626
      Helpers.get_client_final(
627
        data.auth.method,
327✔
628
        secrets,
629
        server_first_parts,
630
        nonce,
631
        secrets.user,
327✔
632
        "biws"
633
      )
634

635
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
327✔
636
    :ok = HandlerHelpers.sock_send(data.sock, bin)
327✔
637

638
    {:authentication_server_first_message, server_proof}
639
  end
640

641
  defp handle_auth_pkts(
642
         %{payload: {:authentication_server_final_message, server_final}},
643
         acc,
644
         _data
645
       ),
646
       do: Map.put(acc, :authentication_server_final_message, server_final)
325✔
647

648
  defp handle_auth_pkts(
649
         %{payload: :authentication_ok},
650
         acc,
651
         _data
652
       ),
653
       do: Map.put(acc, :authentication_ok, true)
325✔
654

655
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
656
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
2✔
657

658
    {_method, secrets_fn} = data.auth.secrets
2✔
659
    secrets = secrets_fn.()
2✔
660

661
    digest =
2✔
662
      if data.auth.method == :password do
2✔
663
        Helpers.md5([secrets.password, secrets.user])
1✔
664
      else
665
        secrets.password
1✔
666
      end
667

668
    payload = ["md5", Helpers.md5([digest, salt]), 0]
2✔
669
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
2✔
670
    :ok = HandlerHelpers.sock_send(data.sock, bin)
2✔
671
    :authentication_md5
672
  end
673

674
  defp handle_auth_pkts(%{payload: :authentication_cleartext_password} = dec_pkt, _, data) do
675
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
676

677
    {_method, secrets_fn} = data.auth.secrets
1✔
678
    secrets = secrets_fn.()
1✔
679

680
    payload = <<secrets.password::binary, 0>>
1✔
681
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
682
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
683
    :authentication_cleartext
684
  end
685

686
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
4✔
687
    do: {:error_response, error}
688

689
  defp handle_auth_pkts(_e, acc, _data), do: acc
1✔
690

691
  @spec handle_authentication_error(map(), String.t()) :: any()
692
  defp handle_authentication_error(%{mode: :proxy}, _reason), do: :ok
×
693

694
  defp handle_authentication_error(%{mode: _other} = data, _reason) do
695
    tenant = Supavisor.tenant(data.id)
1✔
696
    Supavisor.SecretCache.invalidate(tenant, data.user)
1✔
697
  end
698

699
  @spec reconnect_timeout(map()) :: pos_integer()
700
  def reconnect_timeout(%{proxy: true}),
×
701
    do: @reconnect_timeout_proxy
702

703
  def reconnect_timeout(_),
1✔
704
    do: @reconnect_timeout
705

706
  @spec handle_server_messages(binary(), map()) :: map()
707
  defp handle_server_messages(bin, data) do
708
    if FeatureFlag.enabled?(data.tenant_feature_flags, "named_prepared_statements") do
10,087✔
709
      {:ok, updated_data, packets_to_send} = process_backend_streaming(bin, data)
9,988✔
710

711
      if packets_to_send != [] do
9,988✔
712
        HandlerHelpers.sock_send(data.client_sock, packets_to_send)
9,976✔
713
      end
714

715
      updated_data
9,988✔
716
    else
717
      HandlerHelpers.sock_send(data.client_sock, bin)
99✔
718

719
      data
99✔
720
    end
721
  end
722

723
  # If the prepared statement exists for us, it exists for the server, so we just send the
724
  # bind to the socket. If it doesn't, we must send the parse pkt first.
725
  #
726
  # If we received a bind without a parse, we need to intercept the parse response, otherwise,
727
  # the client will receive an unexpected message.
728
  defp handle_prepared_statement_pkt({:bind_pkt, stmt_name, pkt, parse_pkt}, {iodata, data}) do
729
    if stmt_name in data.prepared_statements do
1,532✔
730
      {[pkt | iodata], data}
731
    else
732
      new_data = %{
354✔
733
        data
734
        | stream_state:
735
            MessageStreamer.update_state(data.stream_state, fn queue ->
354✔
736
              :queue.in({:intercept, :parse}, queue)
354✔
737
            end),
738
          prepared_statements: MapSet.put(data.prepared_statements, stmt_name)
354✔
739
      }
740

741
      {[[parse_pkt, pkt] | iodata], new_data}
742
    end
743
  end
744

745
  defp handle_prepared_statement_pkt({:close_pkt, stmt_name, pkt}, {iodata, data}) do
1,333✔
746
    {[pkt | iodata],
747
     %{
748
       data
749
       | prepared_statements: MapSet.delete(data.prepared_statements, stmt_name),
1,333✔
750
         stream_state:
751
           MessageStreamer.update_state(data.stream_state, fn queue ->
1,333✔
752
             :queue.in({:forward, :close}, queue)
1,333✔
753
           end)
754
     }}
755
  end
756

757
  defp handle_prepared_statement_pkt({:describe_pkt, _stmt_name, pkt}, {iodata, data}) do
1,328✔
758
    {[pkt | iodata], data}
759
  end
760

761
  # If we stop generating unique id per statement, and instead do deterministic ids,
762
  # we need to potentially drop parse pkts and return a parse response
763
  defp handle_prepared_statement_pkt({:parse_pkt, stmt_name, pkt}, {iodata, data}) do
764
    if stmt_name in data.prepared_statements do
1,328✔
765
      {iodata,
766
       %{
767
         data
768
         | stream_state:
769
             MessageStreamer.update_state(data.stream_state, fn queue ->
259✔
770
               :queue.in({:inject, :parse}, queue)
259✔
771
             end)
772
       }}
773
    else
774
      prepared_statements = MapSet.put(data.prepared_statements, stmt_name)
1,069✔
775

776
      {[pkt | iodata],
777
       %{
778
         data
779
         | prepared_statements: prepared_statements,
780
           stream_state:
781
             MessageStreamer.update_state(data.stream_state, fn queue ->
1,069✔
782
               :queue.in({:forward, :parse}, queue)
1,069✔
783
             end)
784
       }}
785
    end
786
  end
787

788
  defp evict_exceeding(%{prepared_statements: prepared_statements, id: id}) do
789
    limit = PreparedStatements.backend_limit()
2,884✔
790

791
    if MapSet.size(prepared_statements) >= limit do
2,884✔
792
      count = div(limit, 5)
16✔
793
      to_remove = Enum.take_random(prepared_statements, count) |> MapSet.new()
16✔
794
      close_pkts = Enum.map(to_remove, &PreparedStatements.build_close_pkt/1)
16✔
795
      prepared_statements = MapSet.difference(prepared_statements, to_remove)
16✔
796
      Telem.prepared_statements_evicted(count, id)
16✔
797

798
      {close_pkts, prepared_statements}
799
    else
800
      {[], prepared_statements}
801
    end
802
  end
803

804
  defp maybe_reconnect(reason, data) do
805
    max_reconnect_retries = Application.get_env(:supavisor, :reconnect_retries)
3✔
806
    data = %{data | reconnect_retries: data.reconnect_retries + 1}
3✔
807

808
    if data.reconnect_retries > max_reconnect_retries do
3✔
809
      {:stop, {:failed_to_connect, reason}}
810
    else
811
      {:keep_state, data, {:state_timeout, reconnect_timeout(data), :connect}}
1✔
812
    end
813
  end
814

815
  defp process_backend_streaming(bin, data) do
816
    case MessageStreamer.handle_packets(data.stream_state, bin) do
9,988✔
817
      {:ok, new_stream_state, packets} ->
818
        updated_data = %{data | stream_state: new_stream_state}
9,988✔
819
        {:ok, updated_data, packets}
9,988✔
820

821
      err ->
822
        err
×
823
    end
824
  end
825

826
  defp get_auth_with_secrets(auth, id) do
827
    case Supavisor.SecretCache.get_upstream_auth_secrets(id) do
337✔
828
      {:ok, upstream_auth_secrets} ->
829
        Map.put(auth, :secrets, upstream_auth_secrets)
337✔
830

831
      _other ->
832
        raise "Upstream connection secrets not found"
×
833
    end
834
  end
835

836
  defp handle_connection_failure(reason, data) do
837
    if not data.proxy do
2✔
838
      {_, tenant} = data.tenant
2✔
839
      Supavisor.CircuitBreaker.record_failure(tenant, :db_connection)
2✔
840
    end
841

842
    maybe_reconnect(reason, data)
2✔
843
  end
844
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc