• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 24895189695

24 Apr 2026 02:36PM UTC coverage: 79.313% (-0.03%) from 79.342%
24895189695

push

github

web-flow
fix: retry on checkout when db_handler exits during checkout (#941)

Addresses a rare race condition.

8 of 13 new or added lines in 3 files covered. (61.54%)

1 existing line in 1 file now uncovered.

2515 of 3171 relevant lines covered (79.31%)

58036.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.31
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a connection to the database, send
4
  requests to the database, and handle incoming messages from clients.
5

6
  The state machine uses the Supavisor.Protocol.Server module to decode messages
7
  from the database and sends messages to the client socket it received on checkout.
8

9
  ## Startup modes
10

11
  DbHandler has two startup paths depending on the mode:
12

13
  ### Pool mode (transaction/session)
14

15
  Started via `start_link/1` by poolboy with a Manager config. Fetches auth
16
  secrets (or enters `:waiting_for_secrets` if unavailable), then connects to
17
  the database immediately. Workers are long-lived and shared across clients
18
  via checkout/checkin.
19

20
  ### Proxy mode
21

22
  Started via `start_link/1` under a DynamicSupervisor with full connection
23
  args. Connects to the proxy node immediately. One worker per client, with the
24
  DynamicSupervisor's `:max_children` enforcing the connection limit.
25
  """
26

27
  @behaviour :gen_statem
28

29
  require Logger
30
  require Supavisor
31
  require Supavisor.Protocol.Server, as: Server
32
  require Supavisor.Protocol.MessageStreamer, as: MessageStreamer
33

34
  alias Supavisor.ConnectionParameters
35
  alias Supavisor.Errors.CheckoutError
36
  alias Supavisor.Errors.CheckoutTimeoutError
37
  alias Supavisor.Errors.DbHandlerExitedError
38
  alias Supavisor.Secrets.PasswordSecrets
39
  alias Supavisor.Protocol.{PreparedStatements, StartupOptions}
40

41
  alias Supavisor.{
42
    ClientHandler,
43
    FeatureFlag,
44
    HandlerHelpers,
45
    Helpers,
46
    Monitoring.Telem,
47
    Protocol.BackendMessageHandler,
48
    Protocol.Debug,
49
    Protocol.MessageStreamer,
50
    Protocol.Server
51
  }
52

53
  @type state ::
54
          :connect
55
          | :connect_cooldown
56
          | :authentication
57
          | :idle
58
          | :busy
59
          | :terminating_with_error
60
          | :waiting_for_secrets
61

62
  @sock_closed [:tcp_closed, :ssl_closed]
63
  @proto [:tcp, :ssl]
64
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
65
  @cleanup_buffer_limit 65_536
66
  @connect_cooldown_ms 2_500
67

68
  @auth_error_actions %{
69
    "28P01" => {:keep_pool, :invalidate_secrets},
70
    "28000" => {:keep_pool, :invalidate_secrets},
71
    "3D000" => {:shutdown_pool, :none},
72
    "42501" => {:shutdown_pool, :none},
73
    "22023" => {:shutdown_pool, :none},
74
    "57P03" => {:graceful_shutdown_pool, :none}
75
  }
76

77
  @doc """
78
  Starts a DbHandler state machine.
79
  """
80
  def start_link(config),
81
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
281✔
82

83
  def child_spec(config) do
84
    %{
5✔
85
      id: __MODULE__,
86
      start: {__MODULE__, :start_link, [config]},
87
      restart: :temporary
88
    }
89
  end
90

91
  @doc """
92
  Checks out a DbHandler process
93

94
  Requires a client socket. The DbHandler will forward messages directly to the
95
  client socket when possible.
96

97
  Returns the server socket, which the client may write messages directly to.
98
  """
99
  @spec checkout(pid(), Supavisor.sock(), pid(), Supavisor.mode(), timeout()) ::
100
          {:ok, Supavisor.sock()}
101
          | {:error, DbHandlerExitedError.t()}
102
          | {:error, CheckoutError.t()}
103
          | {:error, CheckoutTimeoutError.t()}
104
  def checkout(pid, sock, caller, mode, timeout \\ 15_000) do
105
    :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
4,835✔
106
  catch
107
    :exit, {:timeout, _} ->
1✔
108
      {:error, %CheckoutTimeoutError{mode: mode, timeout_ms: timeout}}
109

110
    :exit, {reason, _} ->
3✔
111
      {:error, %DbHandlerExitedError{pid: pid, reason: reason}}
112
  end
113

114
  @doc """
115
  Attempts to clean up session state by sending DISCARD ALL to the database.
116

117
  The caller is responsible for ensuring that:
118
  - The DbHandler is NOT actively processing a query
119
  - The DbHandler is NOT in a transaction (no uncommitted changes)
120
  - The DbHandler is in session mode (not transaction mode)
121
  """
122
  @spec attempt_cleanup(pid()) :: :ok | {:error, term()}
123
  def attempt_cleanup(db_handler_pid) do
229✔
124
    :gen_statem.call(db_handler_pid, :cleanup, 5_000)
229✔
125
  catch
126
    :exit, reason ->
72✔
127
      {:error, {:exit, reason}}
128
  end
129

130
  @doc """
131
  Sends prepared statement packets to a DbHandler
132

133
  Different from most packets, prepared statements packets involve state at the DbHandler,
134
  and hence can't be sent directly to the database socket. Instead, they should be sent
135
  to the DbHandler through this function.
136
  """
137
  @spec handle_prepared_statement_pkts(pid, [PreparedStatements.handled_pkt()]) :: :ok
138
  def handle_prepared_statement_pkts(pid, pkts) do
139
    :gen_statem.call(pid, {:handle_ps_pkts, pkts}, 15_000)
2,892✔
140
  end
141

142
  @doc """
143
  Stops a DbHandler
144
  """
145
  @spec stop(pid()) :: :ok
146
  def stop(pid) do
147
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
×
148
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
×
149
  end
150

151
  @doc """
152
  Notifies a DbHandler that secrets are now available
153
  """
154
  @spec notify_secrets_available(pid()) :: :ok
155
  def notify_secrets_available(pid) do
156
    :gen_statem.cast(pid, :secrets_available)
×
157
  end
158

159
  @impl true
160
  def init(args) do
161
    Process.flag(:trap_exit, true)
284✔
162

163
    {id, config} =
284✔
164
      case args do
165
        %{proxy: true} -> {args.id, args}
2✔
166
        %{} -> {args.id, Supavisor.Manager.get_config(args.id)}
282✔
167
      end
168

169
    Helpers.set_log_level(config.log_level)
284✔
170
    Helpers.set_max_heap_size(90)
284✔
171

172
    Logger.metadata(project: config.tenant, user: config.user, mode: config.mode)
284✔
173

174
    data = %{
284✔
175
      id: id,
176
      sock: nil,
177
      connection_params: config.connection_params,
284✔
178
      user: config.user,
284✔
179
      tenant: config.tenant,
284✔
180
      tenant_feature_flags: config.tenant_feature_flags,
284✔
181
      db_state: nil,
182
      parameter_status: %{},
183
      nonce: nil,
184
      server_proof: nil,
185
      stats: %{},
186
      prepared_statements: MapSet.new(),
187
      proxy: Map.get(config, :proxy, false),
188
      client_tls: Map.get(config, :client_tls),
189
      client_jit: Map.get(config, :client_jit),
190
      stream_state: MessageStreamer.new_stream_state(BackendMessageHandler),
191
      mode: config.mode,
284✔
192
      replica_type: config.replica_type,
284✔
193
      caller: nil,
194
      client_sock: nil,
195
      terminating_error: nil,
196
      manager_ref: nil,
197
      derived_secrets: nil
198
    }
199

200
    Telem.handler_action(:db_handler, :started, id)
284✔
201

202
    cooldown =
284✔
203
      if data.proxy,
284✔
204
        do: 0,
205
        else: connect_cooldown_remaining(id)
282✔
206

207
    if cooldown > 0 do
284✔
208
      {:ok, :connect_cooldown, data, {:state_timeout, cooldown, :connect}}
×
209
    else
210
      {initial_state, data, actions} = resolve_secrets(data)
284✔
211
      {:ok, initial_state, data, actions}
284✔
212
    end
213
  end
214

215
  @impl true
216
  def callback_mode, do: [:handle_event_function]
281✔
217

218
  @impl true
219
  def handle_event(:state_timeout, :connect, :connect_cooldown, data) do
220
    {next_state, data, actions} = resolve_secrets(data)
×
221
    {:next_state, next_state, data, actions}
×
222
  end
223

224
  def handle_event(:internal, :connect, :connect, %{connection_params: conn_params} = data) do
225
    Logger.debug("DbHandler: Try to connect to DB")
284✔
226

227
    sock_opts = [
284✔
228
      conn_params.ip_version,
284✔
229
      mode: :binary,
230
      packet: :raw,
231
      nodelay: true,
232
      active: false
233
    ]
234

235
    Telem.handler_action(:db_handler, :db_connection, data.id)
284✔
236

237
    case :gen_tcp.connect(conn_params.host, conn_params.port, sock_opts) do
284✔
238
      {:ok, sock} ->
239
        # Ensure buffer >= recbuf to avoid unnecessary copying
240
        # Set once at connection time as best effort; OS may adjust recbuf later via auto-tuning.
241
        {:ok, [{:recbuf, recbuf}]} = :inet.getopts(sock, [:recbuf])
283✔
242
        :ok = :inet.setopts(sock, buffer: recbuf)
283✔
243

244
        Logger.debug("DbHandler: connection_params #{inspect(conn_params, pretty: true)}")
283✔
245

246
        case try_ssl_handshake({:gen_tcp, sock}, conn_params) do
283✔
247
          {:ok, sock} ->
248
            tenant = if data.proxy, do: Supavisor.id(data.id, :tenant)
282✔
249

250
            options = %{
282✔
251
              "search_path" => Supavisor.id(data.id, :search_path),
282✔
252
              "client_tls" => if(data.proxy, do: to_string(data.client_tls)),
282✔
253
              "jit" => if(data.proxy, do: to_string(data.client_jit))
282✔
254
            }
255

256
            case send_startup(sock, conn_params, tenant, options) do
282✔
257
              :ok ->
258
                :ok = activate(sock)
282✔
259
                {:next_state, :authentication, %{data | sock: sock}}
282✔
260

261
              {:error, reason} ->
262
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
×
263
                handle_connection_failure(reason, data)
×
264
            end
265

266
          {:error, reason} ->
267
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
1✔
268
            handle_connection_failure(reason, data)
1✔
269
        end
270

271
      other ->
272
        Logger.error(
1✔
273
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(conn_params.host)}:#{inspect(conn_params.port)}"
1✔
274
        )
275

276
        handle_connection_failure(other, data)
1✔
277
    end
278
  end
279

280
  def handle_event(:internal, {:terminate_with_error, error, pool_action}, _state, data) do
281
    Logger.debug("DbHandler: Transitioning to terminating_with_error state")
7✔
282

283
    case pool_action do
7✔
284
      :shutdown_pool when not data.proxy ->
285
        Supavisor.Manager.shutdown_with_error(data.id, error)
3✔
286

287
      :graceful_shutdown_pool when not data.proxy ->
288
        Supavisor.async_stop(data.id)
×
289

290
      _ ->
4✔
291
        :ok
292
    end
293

294
    # If not checked out yet, the postponed checkout will handle sending the error
295
    if data.client_sock != nil do
7✔
296
      encode_and_forward_error(error, data)
1✔
297
    end
298

299
    # Use cast to allow postponed events to be processed first
300
    :gen_statem.cast(self(), :finalize_termination)
7✔
301

302
    # This state will handle postponed checkout calls by returning the error
303
    {:next_state, :terminating_with_error, %{data | terminating_error: error}}
7✔
304
  end
305

306
  def handle_event(:cast, :finalize_termination, :terminating_with_error, _data) do
307
    Logger.debug("DbHandler: Stopping from terminating_with_error state")
7✔
308
    {:stop, :normal}
309
  end
310

311
  def handle_event(:state_timeout, :cleanup_timeout, :waiting_cleanup, _data) do
312
    Logger.error("DbHandler: Cleanup timeout, shutting down")
1✔
313
    {:stop, :normal}
314
  end
315

316
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
317
    {:ok, dec_pkt, _} = Server.decode(bin)
827✔
318
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
827✔
319

320
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
827✔
321

322
    case resp do
827✔
323
      {:authentication_sasl, nonce} ->
275✔
324
        {:keep_state, %{data | nonce: nonce}}
325

326
      {:authentication_server_first_message, server_proof, derived_secrets} ->
274✔
327
        {:keep_state,
328
         Map.merge(data, %{server_proof: server_proof, derived_secrets: derived_secrets})}
329

330
      %{authentication_server_final_message: _server_final} ->
2✔
331
        :keep_state_and_data
332

UNCOV
333
      %{authentication_ok: true} ->
×
334
        :keep_state_and_data
335

336
      :authentication_md5 ->
2✔
337
        {:keep_state, data}
338

339
      :authentication_cleartext ->
1✔
340
        {:keep_state, data}
341

342
      {:error_response, error} ->
343
        {pool_action, side_effect} = Map.get(@auth_error_actions, error["C"], {:keep_pool, :none})
5✔
344

345
        if side_effect == :invalidate_secrets do
5✔
346
          handle_authentication_error(data, error["M"] || "Authentication failed")
1✔
347
        end
348

349
        Logger.error("DbHandler: Auth error #{inspect(error)}")
5✔
350

351
        {:keep_state_and_data,
352
         {:next_event, :internal, {:terminate_with_error, error, pool_action}}}
353

354
      {:ready_for_query, acc} ->
355
        ps = acc.ps
267✔
356

357
        Logger.debug(
267✔
358
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
267✔
359
        )
360

361
        if data.mode != :proxy do
267✔
362
          Supavisor.set_parameter_status(data.id, ps)
267✔
363
          cache_derived_secrets(data)
267✔
364
        end
365

366
        {:next_state, :idle, Map.merge(data, %{parameter_status: ps, derived_secrets: nil})}
267✔
367

368
      other ->
369
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
1✔
370
        {:stop, :auth_error, data}
1✔
371
    end
372
  end
373

374
  # the process received message from db while idle
375
  def handle_event(:info, {proto, _, bin}, :idle, _data) when proto in @proto do
376
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when idle")
1✔
377
    :keep_state_and_data
378
  end
379

380
  # forward the message to the client
381
  def handle_event(:info, {proto, _, bin}, :busy, %{caller: caller} = data)
382
      when is_pid(caller) and proto in @proto do
383
    Logger.debug("DbHandler: Got messages: #{Debug.packet_to_string(bin, :backend)}")
9,851✔
384

385
    if String.ends_with?(bin, Server.ready_for_query()) do
9,851✔
386
      ClientHandler.db_status(data.caller, :ready_for_query)
6,384✔
387
      data = handle_server_messages(bin, data)
6,384✔
388

389
      case data.mode do
6,384✔
390
        :transaction ->
391
          {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
4,572✔
392
          {:next_state, :idle, %{data | stats: stats, caller: nil, client_sock: nil}}
4,572✔
393

394
        :proxy ->
×
395
          {:keep_state, data}
396

397
        :session ->
398
          {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
1,812✔
399
          {:keep_state, %{data | stats: stats}}
400
      end
401
    else
402
      data = handle_server_messages(bin, data)
3,467✔
403
      {:keep_state, data}
404
    end
405
  end
406

407
  def handle_event(:info, {proto, _, bin}, :waiting_cleanup, %{caller: caller} = data)
408
      when is_pid(caller) and proto in @proto do
409
    buffered_bin = data.pending_bin <> bin
160✔
410

411
    cond do
160✔
412
      String.ends_with?(buffered_bin, Server.ready_for_query()) ->
413
        new_data = %{data | caller: nil, waiting_cleanup: nil, pending_bin: nil}
158✔
414
        {:next_state, :idle, new_data, {:reply, data.waiting_cleanup, :ok}}
158✔
415

416
      byte_size(buffered_bin) > @cleanup_buffer_limit ->
2✔
417
        Logger.error("DbHandler: Cleanup buffer limit exceeded, shutting down")
1✔
418
        {:stop, :normal}
419

420
      true ->
1✔
421
        {:keep_state, %{data | pending_bin: buffered_bin}}
422
    end
423
  end
424

425
  def handle_event({:call, from}, {:handle_ps_pkts, pkts}, :busy, data) do
426
    {iodata, data} = Enum.reduce(pkts, {[], data}, &handle_prepared_statement_pkt/2)
2,892✔
427

428
    {close_pkts, prepared_statements} = evict_exceeding(data)
2,892✔
429

430
    :ok = HandlerHelpers.sock_send(data.sock, Enum.reverse([close_pkts | iodata]))
2,892✔
431

432
    data = %{
2,892✔
433
      data
434
      | stream_state:
435
          Enum.reduce(close_pkts, data.stream_state, fn _, stream_state ->
2,892✔
436
            MessageStreamer.update_state(stream_state, fn queue ->
640✔
437
              :queue.in({:intercept, :close}, queue)
640✔
438
            end)
439
          end),
440
        prepared_statements: prepared_statements
441
    }
442

443
    {:keep_state, data, {:reply, from, :ok}}
2,892✔
444
  end
445

446
  def handle_event({:call, from}, {:checkout, _sock, _caller}, :terminating_with_error, data) do
447
    Logger.debug("DbHandler: checkout call during terminating_with_error, replying with error")
2✔
448
    error = %Supavisor.Errors.CheckoutError{pid: self(), postgres_error: data.terminating_error}
2✔
449
    {:keep_state_and_data, {:reply, from, {:error, error}}}
450
  end
451

452
  def handle_event({:call, from}, {:checkout, _sock, _caller}, :waiting_for_secrets, _data) do
453
    Logger.debug("DbHandler: checkout call during waiting_for_secrets, replying with error")
1✔
454

455
    postgres_error = %{
1✔
456
      "S" => "FATAL",
457
      "C" => "28P01",
458
      "M" =>
459
        "Authentication credentials are invalid. Please reconnect with fresh credentials to restore pool functionality."
460
    }
461

462
    error = %Supavisor.Errors.CheckoutError{pid: self(), postgres_error: postgres_error}
1✔
463
    {:keep_state_and_data, {:reply, from, {:error, error}}}
464
  end
465

466
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
467
    Logger.debug("DbHandler: checkout call when state was #{state}: #{inspect(caller)}")
5,033✔
468

469
    if state in [:idle, :busy] do
5,033✔
470
      Process.link(caller)
4,827✔
471

472
      if data.mode == :proxy do
4,827✔
473
        bin_ps = Server.encode_parameter_status(data.parameter_status)
×
474
        send(caller, {:parameter_status, bin_ps})
×
475
      end
476

477
      {:next_state, :busy, %{data | client_sock: sock, caller: caller},
4,827✔
478
       {:reply, from, {:ok, data.sock}}}
4,827✔
479
    else
480
      {:keep_state_and_data, :postpone}
481
    end
482
  end
483

484
  def handle_event({:call, from}, :ps, :busy, data) do
485
    Logger.debug("DbHandler: get parameter status")
×
486
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
×
487
  end
488

489
  def handle_event({:call, from}, :cleanup, state, data) do
490
    Logger.debug("DbHandler: Cleanup requested, current state: #{inspect(state)}")
233✔
491

492
    cond do
233✔
493
      data.mode == :transaction ->
233✔
494
        Logger.error(
1✔
495
          "DbHandler: Cleanup called on transaction mode - only supported in session mode"
496
        )
497

498
        {:keep_state_and_data,
499
         {:reply, from, {:error, :cleanup_not_supported_in_transaction_mode}}}
500

501
      state in [:idle, :busy] ->
232✔
502
        Logger.debug("DbHandler: Starting cleanup, sending DISCARD ALL")
231✔
503
        msg = :pgo_protocol.encode_query_message("DISCARD ALL")
231✔
504
        :ok = HandlerHelpers.sock_send(data.sock, msg)
231✔
505

506
        {:next_state, :waiting_cleanup,
231✔
507
         Map.merge(data, %{waiting_cleanup: from, pending_bin: <<>>}),
508
         {:state_timeout, 5_000, :cleanup_timeout}}
509

510
      true ->
1✔
511
        Logger.warning("DbHandler: Cannot cleanup in state #{inspect(state)}")
1✔
512
        {:keep_state_and_data, {:reply, from, {:error, :cant_cleanup_now}}}
513
    end
514
  end
515

516
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
517
    {:stop, {:shutdown, :db_termination}, data}
×
518
  end
519

520
  def handle_event(_, {closed, _}, :authentication, data) when closed in @sock_closed do
521
    Logger.error("DbHandler: Db connection closed when state was authentication")
2✔
522

523
    handle_connection_failure({:error, :db_connection_closed_in_auth}, data)
2✔
524
  end
525

526
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
527
    if state != :terminating_with_error do
1✔
528
      Logger.error("DbHandler: Db connection closed when state was #{state}")
1✔
529
    end
530

531
    {:stop, {:shutdown, :db_termination}, data}
1✔
532
  end
533

534
  # linked client_handler went down
535
  def handle_event(_, {:EXIT, pid, reason}, _state, data) do
536
    if reason != :normal do
98✔
537
      Logger.error(
73✔
538
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
539
      )
540
    end
541

542
    HandlerHelpers.sock_send(data.sock, Server.terminate_message())
98✔
543
    HandlerHelpers.sock_close(data.sock)
98✔
544
    {:stop, :normal}
545
  end
546

547
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
×
548
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
×
549
  end
550

551
  def handle_event(:cast, :secrets_available, :waiting_for_secrets, data) do
552
    Logger.info("DbHandler: Secrets are now available, transitioning to connect state")
1✔
553

554
    Process.demonitor(data.manager_ref, [:flush])
1✔
555

556
    case get_connection_params_with_secrets(data.connection_params, data.id) do
1✔
557
      {:ok, conn_params_with_secrets} ->
558
        {:next_state, :connect,
1✔
559
         %{data | connection_params: conn_params_with_secrets, manager_ref: nil},
560
         {:next_event, :internal, :connect}}
561

562
      {:error, :no_secrets} ->
563
        Logger.error("DbHandler: Still no secrets available after notification")
×
564
        :keep_state_and_data
565
    end
566
  end
567

568
  def handle_event(:info, {:DOWN, ref, :process, _pid, reason}, :waiting_for_secrets, data)
569
      when ref == data.manager_ref do
570
    Logger.error("DbHandler: Manager died while waiting for secrets: #{inspect(reason)}")
×
571
    {:stop, :normal, data}
×
572
  end
573

574
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
575
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
72✔
576
    :keep_state_and_data
577
  end
578

579
  def handle_event(type, content, state, data) do
580
    msg = [
×
581
      {"type", type},
582
      {"content", content},
583
      {"state", state},
584
      {"data", data}
585
    ]
586

587
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
588

589
    :keep_state_and_data
590
  end
591

592
  @impl true
593
  def terminate(_reason, :terminating_with_error, data) do
594
    Telem.handler_action(:db_handler, :stopped, data.id)
5✔
595
  end
596

597
  def terminate(reason, state, data) do
598
    Telem.handler_action(:db_handler, :stopped, data.id)
246✔
599

600
    case reason do
246✔
601
      :normal ->
98✔
602
        :ok
603

604
      :shutdown ->
147✔
605
        :ok
606

607
      reason ->
608
        Logger.error(
1✔
609
          "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
610
        )
611
    end
612
  end
613

614
  @impl true
615
  def format_status(status) do
616
    Map.put(status, :queue, [])
×
617
  end
618

619
  @spec encode_and_forward_error(map(), map()) :: :ok | :noop
620
  defp encode_and_forward_error(message, data) do
621
    case data do
1✔
622
      %{client_sock: sock} when not is_nil(sock) ->
623
        HandlerHelpers.sock_send(
1✔
624
          sock,
625
          Server.encode_error_message(message)
626
        )
627

628
      _other ->
×
629
        :noop
630
    end
631
  end
632

633
  @spec try_ssl_handshake(Supavisor.tcp_sock(), ConnectionParameters.t()) ::
634
          {:ok, Supavisor.sock()} | {:error, term()}
635
  defp try_ssl_handshake(sock, %ConnectionParameters{upstream_ssl: true} = conn_params) do
636
    case HandlerHelpers.sock_send(sock, Server.ssl_request()) do
1✔
637
      :ok -> ssl_recv(sock, conn_params)
1✔
638
      error -> error
×
639
    end
640
  end
641

642
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
282✔
643

644
  @spec ssl_recv(Supavisor.tcp_sock(), ConnectionParameters.t()) ::
645
          {:ok, Supavisor.ssl_sock()} | {:error, term}
646
  defp ssl_recv({:gen_tcp, sock} = s, conn_params) do
647
    case :gen_tcp.recv(sock, 1, 15_000) do
1✔
648
      {:ok, <<?S>>} -> ssl_connect(s, conn_params)
×
649
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
1✔
650
      {:error, _} = error -> error
×
651
    end
652
  end
653

654
  @spec ssl_connect(Supavisor.tcp_sock(), ConnectionParameters.t(), pos_integer) ::
655
          {:ok, Supavisor.ssl_sock()} | {:error, term}
656
  defp ssl_connect({:gen_tcp, sock}, conn_params, timeout \\ 5000) do
657
    opts =
×
658
      case conn_params.upstream_verify do
×
659
        :peer ->
×
660
          [
661
            verify: :verify_peer,
662
            cacerts: [conn_params.upstream_tls_ca],
×
663
            # unclear behavior on pg14
664
            server_name_indication: conn_params.sni_hostname || conn_params.host,
×
665
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}],
×
666
            receiver_spawn_opts: [min_heap_size: 2048]
667
          ]
668

669
        :none ->
×
670
          [verify: :verify_none, receiver_spawn_opts: [min_heap_size: 2048]]
671
      end
672

673
    case :ssl.connect(sock, opts, timeout) do
×
674
      {:ok, ssl_sock} ->
×
675
        {:ok, {:ssl, ssl_sock}}
676

677
      {:error, reason} ->
×
678
        {:error, reason}
679
    end
680
  end
681

682
  @spec send_startup(Supavisor.sock(), ConnectionParameters.t(), String.t() | nil, map()) ::
683
          :ok | {:error, term}
684
  def send_startup(sock, conn_params, tenant, options) do
685
    user =
282✔
686
      if is_nil(tenant),
687
        do: conn_params.secrets.user,
280✔
688
        else: "#{conn_params.secrets.user}.#{tenant}"
2✔
689

690
    options = Map.reject(options, fn {_k, v} -> is_nil(v) end)
282✔
691

692
    msg =
282✔
693
      :pgo_protocol.encode_startup_message(
694
        [
695
          {"user", user},
696
          {"database", conn_params.database},
282✔
697
          {"application_name", conn_params.application_name}
282✔
698
        ] ++
699
          if(options != %{},
282✔
700
            do: [{"options", StartupOptions.encode(options)}],
701
            else: []
702
          )
703
      )
704

705
    HandlerHelpers.sock_send(sock, msg)
282✔
706
  end
707

708
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
709
  defp activate({:gen_tcp, sock}) do
710
    :inet.setopts(sock, active: @switch_active_count)
282✔
711
  end
712

713
  defp activate({:ssl, sock}) do
714
    :ssl.setopts(sock, active: @switch_active_count)
×
715
  end
716

717
  @spec handle_auth_pkts(map(), map(), map()) :: any()
718
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
719
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
3,471✔
720

721
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
267✔
722
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
723

724
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
725
    if data.mode != :proxy do
267✔
726
      Logger.metadata(backend_pid: payload[:pid])
267✔
727
    end
728

729
    key = self()
267✔
730

731
    conn = %{
267✔
732
      host: data.connection_params.host,
267✔
733
      port: data.connection_params.port,
267✔
734
      ip_version: data.connection_params.ip_version
267✔
735
    }
736

737
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
267✔
738
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
267✔
739
    Map.put(acc, :backend_key_data, payload)
267✔
740
  end
741

742
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
743
    nonce =
275✔
744
      case Server.decode_string(methods_b) do
745
        {:ok, req_method, _} ->
746
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
274✔
747
          nonce = :pgo_scram.get_nonce(16)
274✔
748
          user = data.connection_params.secrets.user
274✔
749
          client_first = :pgo_scram.get_client_first(user, nonce)
274✔
750
          client_first_size = IO.iodata_length(client_first)
274✔
751

752
          sasl_initial_response = [
274✔
753
            "SCRAM-SHA-256",
754
            0,
755
            <<client_first_size::32-integer>>,
756
            client_first
757
          ]
758

759
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
274✔
760
          :ok = HandlerHelpers.sock_send(data.sock, bin)
274✔
761
          nonce
274✔
762

763
        other ->
764
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
1✔
765
          nil
766
      end
767

768
    {:authentication_sasl, nonce}
769
  end
770

771
  defp handle_auth_pkts(
772
         %{payload: {:authentication_server_first_message, server_first}},
773
         _,
774
         data
775
       ) do
776
    nonce = data.nonce
274✔
777
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
274✔
778

779
    secrets = data.connection_params.secrets
274✔
780

781
    {client_final_message, server_proof, derived_secrets} =
274✔
782
      Helpers.get_client_final(
783
        secrets,
784
        server_first_parts,
785
        nonce,
786
        secrets.user,
274✔
787
        "biws"
788
      )
789

790
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
274✔
791
    :ok = HandlerHelpers.sock_send(data.sock, bin)
274✔
792

793
    {:authentication_server_first_message, server_proof, derived_secrets}
274✔
794
  end
795

796
  defp handle_auth_pkts(
797
         %{payload: {:authentication_server_final_message, server_final}},
798
         acc,
799
         _data
800
       ),
801
       do: Map.put(acc, :authentication_server_final_message, server_final)
272✔
802

803
  defp handle_auth_pkts(
804
         %{payload: :authentication_ok},
805
         acc,
806
         _data
807
       ),
808
       do: Map.put(acc, :authentication_ok, true)
272✔
809

810
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
811
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
2✔
812

813
    %PasswordSecrets{password: password, user: user} = data.connection_params.secrets
2✔
814

815
    digest = Helpers.md5([password, user])
2✔
816

817
    payload = ["md5", Helpers.md5([digest, salt]), 0]
2✔
818
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
2✔
819
    :ok = HandlerHelpers.sock_send(data.sock, bin)
2✔
820
    :authentication_md5
821
  end
822

823
  defp handle_auth_pkts(%{payload: :authentication_cleartext_password} = dec_pkt, _, data) do
824
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
825

826
    %PasswordSecrets{password: password} = data.connection_params.secrets
1✔
827
    payload = <<password::binary, 0>>
1✔
828
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
829
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
830
    :authentication_cleartext
831
  end
832

833
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
5✔
834
    do: {:error_response, error}
835

836
  defp handle_auth_pkts(_e, acc, _data), do: acc
1✔
837

838
  defp cache_derived_secrets(%{id: id, derived_secrets: derived_secrets})
839
       when not is_nil(derived_secrets) do
840
    Supavisor.UpstreamAuthentication.put_upstream_auth_secrets(id, derived_secrets)
57✔
841
  end
842

843
  defp cache_derived_secrets(_data), do: :ok
210✔
844

845
  @spec handle_authentication_error(map(), String.t()) :: any()
846
  defp handle_authentication_error(%{mode: :proxy}, _reason), do: :ok
×
847

848
  defp handle_authentication_error(%{mode: _other} = data, _reason) do
849
    tenant = Supavisor.id(data.id, :tenant)
1✔
850
    Supavisor.ClientAuthentication.invalidate_global(tenant, data.user)
1✔
851
    Supavisor.UpstreamAuthentication.delete_upstream_auth_secrets(data.id)
1✔
852
  end
853

854
  @spec handle_server_messages(binary(), map()) :: map()
855
  defp handle_server_messages(bin, data) do
856
    if FeatureFlag.enabled?(data.tenant_feature_flags, "named_prepared_statements") do
9,851✔
857
      {:ok, updated_data, packets_to_send} = process_backend_streaming(bin, data)
9,752✔
858

859
      if packets_to_send != [] do
9,752✔
860
        HandlerHelpers.sock_send(data.client_sock, packets_to_send)
9,746✔
861
      end
862

863
      updated_data
9,752✔
864
    else
865
      HandlerHelpers.sock_send(data.client_sock, bin)
99✔
866

867
      data
99✔
868
    end
869
  end
870

871
  # If the prepared statement exists for us, it exists for the server, so we just send the
872
  # bind to the socket. If it doesn't, we must send the parse pkt first.
873
  #
874
  # If we received a bind without a parse, we need to intercept the parse response, otherwise,
875
  # the client will receive an unexpected message.
876
  defp handle_prepared_statement_pkt({:bind_pkt, stmt_name, pkt, parse_pkt}, {iodata, data}) do
877
    if stmt_name in data.prepared_statements do
1,532✔
878
      {[pkt | iodata], data}
879
    else
880
      new_data = %{
358✔
881
        data
882
        | stream_state:
883
            MessageStreamer.update_state(data.stream_state, fn queue ->
358✔
884
              :queue.in({:intercept, :parse}, queue)
358✔
885
            end),
886
          prepared_statements: MapSet.put(data.prepared_statements, stmt_name)
358✔
887
      }
888

889
      {[[parse_pkt, pkt] | iodata], new_data}
890
    end
891
  end
892

893
  defp handle_prepared_statement_pkt({:close_pkt, stmt_name, pkt}, {iodata, data}) do
1,339✔
894
    {[pkt | iodata],
895
     %{
896
       data
897
       | prepared_statements: MapSet.delete(data.prepared_statements, stmt_name),
1,339✔
898
         stream_state:
899
           MessageStreamer.update_state(data.stream_state, fn queue ->
1,339✔
900
             :queue.in({:forward, :close}, queue)
1,339✔
901
           end)
902
     }}
903
  end
904

905
  defp handle_prepared_statement_pkt({:describe_pkt, _stmt_name, pkt}, {iodata, data}) do
1,330✔
906
    {[pkt | iodata], data}
907
  end
908

909
  # If we stop generating unique id per statement, and instead do deterministic ids,
910
  # we need to potentially drop parse pkts and return a parse response
911
  defp handle_prepared_statement_pkt({:parse_pkt, stmt_name, pkt}, {iodata, data}) do
912
    if stmt_name in data.prepared_statements do
1,330✔
913
      {iodata,
914
       %{
915
         data
916
         | stream_state:
917
             MessageStreamer.update_state(data.stream_state, fn queue ->
275✔
918
               :queue.in({:inject, :parse}, queue)
275✔
919
             end)
920
       }}
921
    else
922
      prepared_statements = MapSet.put(data.prepared_statements, stmt_name)
1,055✔
923

924
      {[pkt | iodata],
925
       %{
926
         data
927
         | prepared_statements: prepared_statements,
928
           stream_state:
929
             MessageStreamer.update_state(data.stream_state, fn queue ->
1,055✔
930
               :queue.in({:forward, :parse}, queue)
1,055✔
931
             end)
932
       }}
933
    end
934
  end
935

936
  defp evict_exceeding(%{prepared_statements: prepared_statements, id: id}) do
937
    limit = PreparedStatements.backend_limit()
2,892✔
938

939
    if MapSet.size(prepared_statements) >= limit do
2,892✔
940
      count = div(limit, 5)
16✔
941
      to_remove = Enum.take_random(prepared_statements, count) |> MapSet.new()
16✔
942
      close_pkts = Enum.map(to_remove, &PreparedStatements.build_close_pkt/1)
16✔
943
      prepared_statements = MapSet.difference(prepared_statements, to_remove)
16✔
944
      Telem.prepared_statements_evicted(count, id)
16✔
945

946
      {close_pkts, prepared_statements}
947
    else
948
      {[], prepared_statements}
949
    end
950
  end
951

952
  defp process_backend_streaming(bin, data) do
953
    case MessageStreamer.handle_packets(data.stream_state, bin) do
9,752✔
954
      {:ok, new_stream_state, packets} ->
955
        updated_data = %{data | stream_state: new_stream_state}
9,752✔
956
        {:ok, updated_data, packets}
9,752✔
957

958
      err ->
959
        err
×
960
    end
961
  end
962

963
  defp get_connection_params_with_secrets(conn_params, id) do
964
    case Supavisor.UpstreamAuthentication.get_upstream_auth_secrets(id) do
283✔
965
      {:ok, upstream_auth_secrets} ->
281✔
966
        {:ok, %{conn_params | secrets: upstream_auth_secrets}}
967

968
      _other ->
2✔
969
        {:error, :no_secrets}
970
    end
971
  end
972

973
  defp handle_connection_failure(reason, data) do
974
    if not data.proxy do
4✔
975
      Supavisor.CircuitBreaker.record_failure(data.tenant, :db_connection)
2✔
976
      Supavisor.TenantCache.put_last_connect_failure(data.id, System.monotonic_time(:millisecond))
2✔
977
    end
978

979
    error = %{
4✔
980
      "S" => "FATAL",
981
      "C" => "08006",
982
      "M" => "Failed to connect to database: #{inspect(reason)}"
983
    }
984

985
    {:keep_state_and_data, {:next_event, :internal, {:terminate_with_error, error, :keep_pool}}}
986
  end
987

988
  defp resolve_secrets(data) do
989
    if data.proxy do
284✔
990
      {:connect, data, {:next_event, :internal, :connect}}
2✔
991
    else
992
      case get_connection_params_with_secrets(data.connection_params, data.id) do
282✔
993
        {:ok, conn_params_with_secrets} ->
994
          {:connect, %{data | connection_params: conn_params_with_secrets},
280✔
995
           {:next_event, :internal, :connect}}
996

997
        {:error, :no_secrets} ->
998
          Logger.warning("DbHandler: Secrets not available, entering waiting state")
2✔
999
          manager_pid = Supavisor.get_local_manager(data.id)
2✔
1000
          ref = Process.monitor(manager_pid)
2✔
1001
          Supavisor.Manager.register_waiting_for_secrets(data.id, self())
2✔
1002
          {:waiting_for_secrets, %{data | manager_ref: ref}, []}
2✔
1003
      end
1004
    end
1005
  end
1006

1007
  defp connect_cooldown_remaining(id) do
1008
    case Supavisor.TenantCache.get_last_connect_failure(id) do
282✔
1009
      nil ->
282✔
1010
        0
1011

1012
      last_failure ->
1013
        elapsed = System.monotonic_time(:millisecond) - last_failure
×
1014
        @connect_cooldown_ms - elapsed
×
1015
    end
1016
  end
1017
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc