• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.32
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a connection to the database, send
4
  requests to the database, and handle incoming messages from clients.
5

6
  The state machine uses the Supavisor.Protocol.Server module to decode messages
7
  from the database and sends messages to the client socket it received on checkout.
8
  """
9

10
  @behaviour :gen_statem
11

12
  require Logger
13
  require Supavisor.Protocol.Server, as: Server
14
  require Supavisor.Protocol.MessageStreamer, as: MessageStreamer
15

16
  alias Supavisor.Protocol.PreparedStatements
17

18
  alias Supavisor.{
19
    ClientHandler,
20
    FeatureFlag,
21
    HandlerHelpers,
22
    Helpers,
23
    Monitoring.Telem,
24
    Protocol.BackendMessageHandler,
25
    Protocol.Debug,
26
    Protocol.MessageStreamer,
27
    Protocol.Server
28
  }
29

30
  @type state :: :connect | :authentication | :idle | :busy
31

32
  @reconnect_timeout 2_500
33
  @reconnect_timeout_proxy 500
34
  @sock_closed [:tcp_closed, :ssl_closed]
35
  @proto [:tcp, :ssl]
36
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
37

38
  @doc """
39
  Starts a DbHandler state machine
40

41
  Accepts two different types of args:
42

43
  TODO: details about required arguments in each
44
  - proxied
45
  - not proxied
46
  """
47
  def start_link(config),
48
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
304✔
49

50
  @doc """
51
  Checks out a DbHandler process
52

53
  Requires a client socket. The DbHandler will forward messages directly to the
54
  client socket when possible.
55

56
  Returns the server socket, which the client may write messages directly to.
57
  """
58
  @spec checkout(pid(), Supavisor.sock(), pid(), timeout()) ::
59
          {:ok, Supavisor.sock()} | {:error, {:exit, term()}}
60
  def checkout(pid, sock, caller, timeout \\ 15_000) do
61
    :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
4,801✔
62
  catch
63
    :exit, reason ->
40✔
64
      {:error, {:exit, reason}}
65
  end
66

67
  @doc """
68
  Checks in a DbHandler process
69
  """
70
  @spec checkin(pid()) :: :ok
71
  def checkin(pid), do: :gen_statem.cast(pid, :checkin)
×
72

73
  @doc """
74
  Sends prepared statement packets to a DbHandler
75

76
  Different from most packets, prepared statements packets involve state at the DbHandler,
77
  and hence can't be sent directly to the database socket. Instead, they should be sent
78
  to the DbHandler through this function.
79
  """
80
  @spec handle_prepared_statement_pkts(pid, [PreparedStatements.handled_pkt()]) :: :ok
81
  def handle_prepared_statement_pkts(pid, pkts) do
82
    :gen_statem.call(pid, {:handle_ps_pkts, pkts}, 15_000)
2,884✔
83
  end
84

85
  @doc """
86
  Returns the state and the mode of the DbHandler
87
  """
88
  @spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
UNCOV
89
  def get_state_and_mode(pid) do
×
90
    {:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
91
  catch
UNCOV
92
    error, reason -> {:error, {error, reason}}
×
93
  end
94

95
  @doc """
96
  Stops a DbHandler
97
  """
98
  @spec stop(pid()) :: :ok
99
  def stop(pid) do
UNCOV
100
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
×
UNCOV
101
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
×
102
  end
103

104
  @impl true
105
  def init(args) do
106
    Process.flag(:trap_exit, true)
305✔
107

108
    {id, config} =
305✔
109
      case args do
NEW
110
        %{proxy: true} ->
×
NEW
111
          {args.id, args}
×
112

113
        %{} ->
114
          config = Supavisor.Manager.get_config(args.id)
305✔
115
          {args.id, config}
305✔
116
      end
117

118
    Helpers.set_log_level(config.log_level)
305✔
119
    Helpers.set_max_heap_size(90)
305✔
120

121
    {_, tenant} = config.tenant
305✔
122
    Logger.metadata(project: tenant, user: config.user, mode: config.mode)
305✔
123

124
    auth = get_auth_with_secrets(config.auth, tenant, config.user)
305✔
125

126
    data =
305✔
127
      %{
128
        id: id,
129
        sock: nil,
130
        auth: auth,
131
        user: config.user,
305✔
132
        tenant: config.tenant,
305✔
133
        tenant_feature_flags: config.tenant_feature_flags,
305✔
134
        db_state: nil,
135
        parameter_status: %{},
136
        nonce: nil,
137
        server_proof: nil,
138
        stats: %{},
139
        client_stats: %{},
140
        prepared_statements: MapSet.new(),
141
        proxy: Map.get(config, :proxy, false),
142
        stream_state: MessageStreamer.new_stream_state(BackendMessageHandler),
143
        mode: config.mode,
305✔
144
        replica_type: config.replica_type,
305✔
145
        caller: Map.get(config, :caller) || nil,
305✔
146
        client_sock: Map.get(config, :client_sock) || nil,
305✔
147
        reconnect_retries: 0
148
      }
149

150
    Telem.handler_action(:db_handler, :started, id)
305✔
151
    {:ok, :connect, data, {:next_event, :internal, :connect}}
305✔
152
  end
153

154
  @impl true
155
  def callback_mode, do: [:handle_event_function]
304✔
156

157
  @impl true
158
  def handle_event(:internal, :connect, :connect, %{auth: auth} = data) do
159
    Logger.debug("DbHandler: Try to connect to DB")
308✔
160

161
    sock_opts = [
308✔
162
      auth.ip_version,
308✔
163
      mode: :binary,
164
      packet: :raw,
165
      nodelay: true,
166
      active: false
167
    ]
168

169
    Telem.handler_action(:db_handler, :db_connection, data.id)
308✔
170

171
    case :gen_tcp.connect(auth.host, auth.port, sock_opts) do
308✔
172
      {:ok, sock} ->
173
        Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}")
306✔
174

175
        case try_ssl_handshake({:gen_tcp, sock}, auth) do
306✔
176
          {:ok, sock} ->
177
            tenant = if data.mode == :proxy, do: Supavisor.tenant(data.id)
305✔
178
            search_path = Supavisor.search_path(data.id)
305✔
179

180
            case send_startup(sock, auth, tenant, search_path) do
305✔
181
              :ok ->
182
                :ok = activate(sock)
305✔
183
                {:next_state, :authentication, %{data | sock: sock}}
305✔
184

185
              {:error, reason} ->
186
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
×
NEW
187
                handle_connection_failure(reason, data)
×
188
            end
189

190
          {:error, reason} ->
191
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
1✔
192
            maybe_reconnect(reason, data)
1✔
193
        end
194

195
      other ->
196
        Logger.error(
2✔
197
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}"
2✔
198
        )
199

200
        handle_connection_failure(other, data)
2✔
201
    end
202
  end
203

204
  def handle_event(:state_timeout, :connect, _state, data) do
205
    retry = data.reconnect_retries
×
206
    Logger.warning("DbHandler: Reconnect #{retry} to DB")
×
207

208
    {:keep_state, %{data | reconnect_retries: retry + 1}, {:next_event, :internal, :connect}}
×
209
  end
210

211
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
212
    {:ok, dec_pkt, _} = Server.decode(bin)
896✔
213
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
896✔
214

215
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
896✔
216

217
    case resp do
896✔
218
      {:authentication_sasl, nonce} ->
298✔
219
        {:keep_state, %{data | nonce: nonce}}
220

221
      {:authentication_server_first_message, server_proof} ->
297✔
222
        {:keep_state, %{data | server_proof: server_proof}}
223

224
      %{authentication_server_final_message: _server_final} ->
×
225
        :keep_state_and_data
226

227
      %{authentication_ok: true} ->
×
228
        :keep_state_and_data
229

230
      :authentication_md5 ->
2✔
231
        {:keep_state, data}
232

233
      :authentication_cleartext ->
1✔
234
        {:keep_state, data}
235

236
      {:error_response, %{"S" => "FATAL", "C" => "28P01"} = error} ->
237
        reason = error["M"] || "Authentication failed"
1✔
238
        handle_authentication_error(data, reason)
1✔
239
        Logger.error("DbHandler: Auth error #{inspect(error)}")
1✔
240
        {:stop, :invalid_password, data}
1✔
241

242
      {:error_response, %{"S" => "FATAL", "C" => "3D000"} = error} ->
243
        Logger.error("DbHandler: Database does not exist: #{inspect(error)}")
2✔
244
        encode_and_forward_error(error, data)
2✔
245

246
        if not data.proxy do
2✔
247
          Supavisor.Manager.terminate_pool(data.id, error)
2✔
248
        end
249

250
        {:stop, :normal, data}
2✔
251

252
      {:error_response, %{"S" => "FATAL", "C" => "42501"} = error} ->
NEW
253
        Logger.error("DbHandler: Insufficient privilege: #{inspect(error)}")
×
NEW
254
        encode_and_forward_error(error, data)
×
255

NEW
256
        if not data.proxy do
×
NEW
257
          Supavisor.Manager.terminate_pool(data.id, error)
×
258
        end
259

NEW
260
        {:stop, :normal, data}
×
261

262
      {:error_response, error} ->
263
        Logger.error("DbHandler: Error response during auth: #{inspect(error)}")
1✔
264
        encode_and_forward_error(error, data)
1✔
265
        {:stop, :normal}
266

267
      {:ready_for_query, acc} ->
268
        ps = acc.ps
293✔
269

270
        Logger.debug(
293✔
271
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
293✔
272
        )
273

274
        if data.mode == :proxy do
293✔
275
          bin_ps = Server.encode_parameter_status(ps)
×
276
          send(data.caller, {:parameter_status, bin_ps})
×
277
        else
278
          Supavisor.set_parameter_status(data.id, ps)
293✔
279
        end
280

281
        {:next_state, :idle, %{data | parameter_status: ps, reconnect_retries: 0}}
293✔
282

283
      other ->
284
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
1✔
285
        {:stop, :auth_error, data}
1✔
286
    end
287
  end
288

289
  # the process received message from db while idle
290
  def handle_event(:info, {proto, _, bin}, :idle, _data) when proto in @proto do
291
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when idle")
1✔
292
    :keep_state_and_data
293
  end
294

295
  # forward the message to the client
296
  def handle_event(:info, {proto, _, bin}, :busy, %{caller: caller} = data)
297
      when is_pid(caller) and proto in @proto do
298
    Logger.debug("DbHandler: Got messages: #{Debug.packet_to_string(bin, :backend)}")
20,648✔
299

300
    if String.ends_with?(bin, Server.ready_for_query()) do
20,648✔
301
      HandlerHelpers.activate(data.sock)
6,305✔
302

303
      {_, stats} =
6,305✔
304
        if data.mode == :proxy,
6,305✔
305
          do: {nil, data.stats},
×
306
          else: Telem.network_usage(:db, data.sock, data.id, data.stats)
6,305✔
307

308
      # in transaction mode, we need to notify the client when the transaction is finished,
309
      # after which it will unlink the direct db connection process from itself.
310
      if data.mode == :transaction do
6,305✔
311
        ClientHandler.db_status(data.caller, :ready_for_query)
4,518✔
312
        data = handle_server_messages(bin, data)
4,518✔
313

314
        {:next_state, :idle, %{data | stats: stats, caller: nil, client_sock: nil}}
4,518✔
315
      else
316
        data = handle_server_messages(bin, data)
1,787✔
317

318
        {_, client_stats} =
1,787✔
319
          if data.mode == :proxy,
1,787✔
NEW
320
            do: {nil, data.client_stats},
×
321
            else: Telem.network_usage(:client, data.client_sock, data.id, data.client_stats)
1,787✔
322

323
        {:keep_state, %{data | stats: stats, client_stats: client_stats}}
324
      end
325
    else
326
      data = handle_server_messages(bin, data)
14,343✔
327
      {:keep_state, data}
328
    end
329
  end
330

331
  def handle_event({:call, from}, {:handle_ps_pkts, pkts}, :busy, data) do
332
    {iodata, data} = Enum.reduce(pkts, {[], data}, &handle_prepared_statement_pkt/2)
2,884✔
333

334
    {close_pkts, prepared_statements} = evict_exceeding(data)
2,884✔
335

336
    :ok = HandlerHelpers.sock_send(data.sock, Enum.reverse([close_pkts | iodata]))
2,884✔
337

338
    data = %{
2,884✔
339
      data
340
      | stream_state:
341
          Enum.reduce(close_pkts, data.stream_state, fn _, stream_state ->
2,884✔
342
            MessageStreamer.update_state(stream_state, fn queue ->
520✔
343
              :queue.in({:intercept, :close}, queue)
520✔
344
            end)
345
          end),
346
        prepared_statements: prepared_statements
347
    }
348

349
    {:keep_state, data, {:reply, from, :ok}}
2,884✔
350
  end
351

352
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
353
    Logger.debug("DbHandler: checkout call when state was #{state}: #{inspect(caller)}")
5,016✔
354

355
    if state in [:idle, :busy] do
5,016✔
356
      {:next_state, :busy, %{data | client_sock: sock, caller: caller},
4,760✔
357
       {:reply, from, {:ok, data.sock}}}
4,760✔
358
    else
359
      {:keep_state_and_data, :postpone}
360
    end
361
  end
362

363
  def handle_event({:call, from}, :ps, :busy, data) do
364
    Logger.debug("DbHandler: get parameter status")
×
365
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
×
366
  end
367

368
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
369
    {:stop, {:shutdown, :db_termination}, data}
×
370
  end
371

372
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
373
    Logger.error("DbHandler: Connection closed when state was #{state}")
1✔
374

375
    if Application.get_env(:supavisor, :reconnect_on_db_close),
1✔
376
      do: {:next_state, :connect, data, {:state_timeout, reconnect_timeout(data), :connect}},
×
377
      else: {:stop, {:shutdown, :db_termination}, data}
1✔
378
  end
379

380
  # linked client_handler went down
381
  def handle_event(_, {:EXIT, pid, reason}, _state, data) do
382
    if reason != :normal do
242✔
383
      Logger.error(
6✔
384
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
385
      )
386
    end
387

388
    HandlerHelpers.sock_send(data.sock, Server.terminate_message())
242✔
389
    HandlerHelpers.sock_close(data.sock)
242✔
390
    {:stop, :normal}
391
  end
392

UNCOV
393
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
×
UNCOV
394
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
×
395
  end
396

397
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
NEW
398
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
×
399
    :keep_state_and_data
400
  end
401

402
  def handle_event(type, content, state, data) do
403
    msg = [
×
404
      {"type", type},
405
      {"content", content},
406
      {"state", state},
407
      {"data", data}
408
    ]
409

410
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
411

412
    :keep_state_and_data
413
  end
414

415
  @impl true
416
  def terminate(:shutdown, _state, data) do
417
    Telem.handler_action(:db_handler, :stopped, data.id)
39✔
418
    :ok
419
  end
420

421
  def terminate(reason, state, data) do
422
    Telem.handler_action(:db_handler, :stopped, data.id)
245✔
423

424
    case reason do
245✔
425
      :normal ->
244✔
426
        :ok
427

NEW
428
      :shutdown ->
×
429
        :ok
430

431
      reason ->
432
        Logger.error(
1✔
433
          "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
434
        )
435
    end
436
  end
437

438
  @spec encode_and_forward_error(map(), map()) :: :ok | :noop
439
  defp encode_and_forward_error(message, data) do
440
    case data do
3✔
441
      %{client_sock: sock} when not is_nil(sock) ->
442
        HandlerHelpers.sock_send(
1✔
443
          sock,
444
          Server.encode_error_message(message)
445
        )
446

447
      _other ->
2✔
448
        :noop
449
    end
450
  end
451

452
  @spec try_ssl_handshake(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.sock()} | {:error, term()}
453
  defp try_ssl_handshake(sock, %{upstream_ssl: true} = auth) do
454
    case HandlerHelpers.sock_send(sock, Server.ssl_request()) do
1✔
455
      :ok -> ssl_recv(sock, auth)
1✔
456
      error -> error
×
457
    end
458
  end
459

460
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
305✔
461

462
  @spec ssl_recv(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.ssl_sock()} | {:error, term}
463
  defp ssl_recv({:gen_tcp, sock} = s, auth) do
464
    case :gen_tcp.recv(sock, 1, 15_000) do
1✔
465
      {:ok, <<?S>>} -> ssl_connect(s, auth)
×
466
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
1✔
467
      {:error, _} = error -> error
×
468
    end
469
  end
470

471
  @spec ssl_connect(Supavisor.tcp_sock(), map, pos_integer) ::
472
          {:ok, Supavisor.ssl_sock()} | {:error, term}
473
  defp ssl_connect({:gen_tcp, sock}, auth, timeout \\ 5000) do
474
    opts =
×
475
      case auth.upstream_verify do
×
476
        :peer ->
×
477
          [
478
            verify: :verify_peer,
479
            cacerts: [auth.upstream_tls_ca],
×
480
            # unclear behavior on pg14
481
            server_name_indication: auth.sni_hostname || auth.host,
×
482
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
483
          ]
484

485
        :none ->
×
486
          [verify: :verify_none]
487
      end
488

489
    case :ssl.connect(sock, opts, timeout) do
×
490
      {:ok, ssl_sock} ->
×
491
        {:ok, {:ssl, ssl_sock}}
492

493
      {:error, reason} ->
×
494
        {:error, reason}
495
    end
496
  end
497

498
  @spec send_startup(Supavisor.sock(), map(), String.t() | nil, String.t() | nil) ::
499
          :ok | {:error, term}
500
  def send_startup(sock, auth, tenant, search_path) do
501
    user =
305✔
502
      if is_nil(tenant), do: get_user(auth), else: "#{get_user(auth)}.#{tenant}"
305✔
503

504
    msg =
305✔
505
      :pgo_protocol.encode_startup_message(
506
        [
507
          {"user", user},
508
          {"database", auth.database},
305✔
509
          {"application_name", auth.application_name}
305✔
510
        ] ++ if(search_path, do: [{"options", "--search_path=#{search_path}"}], else: [])
305✔
511
      )
512

513
    HandlerHelpers.sock_send(sock, msg)
305✔
514
  end
515

516
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
517
  defp activate({:gen_tcp, sock}) do
518
    :inet.setopts(sock, active: @switch_active_count)
305✔
519
  end
520

521
  defp activate({:ssl, sock}) do
NEW
522
    :ssl.setopts(sock, active: @switch_active_count)
×
523
  end
524

525
  defp get_user(auth) do
526
    {_method, secrets_fn} = auth.secrets
602✔
527
    secrets_map = secrets_fn.()
602✔
528
    secrets_map.user
602✔
529
  end
530

531
  @spec handle_auth_pkts(map(), map(), map()) :: any()
532
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
533
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
3,809✔
534

535
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
293✔
536
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
537

538
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
539
    if data.mode != :proxy do
293✔
540
      Logger.metadata(backend_pid: payload[:pid])
293✔
541
    end
542

543
    key = self()
293✔
544
    conn = %{host: data.auth.host, port: data.auth.port, ip_version: data.auth.ip_version}
293✔
545
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
293✔
546
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
293✔
547
    Map.put(acc, :backend_key_data, payload)
293✔
548
  end
549

550
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
551
    nonce =
298✔
552
      case Server.decode_string(methods_b) do
553
        {:ok, req_method, _} ->
554
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
297✔
555
          nonce = :pgo_scram.get_nonce(16)
297✔
556
          user = get_user(data.auth)
297✔
557
          client_first = :pgo_scram.get_client_first(user, nonce)
297✔
558
          client_first_size = IO.iodata_length(client_first)
297✔
559

560
          sasl_initial_response = [
297✔
561
            "SCRAM-SHA-256",
562
            0,
563
            <<client_first_size::32-integer>>,
564
            client_first
565
          ]
566

567
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
297✔
568
          :ok = HandlerHelpers.sock_send(data.sock, bin)
297✔
569
          nonce
297✔
570

571
        other ->
572
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
1✔
573
          nil
574
      end
575

576
    {:authentication_sasl, nonce}
577
  end
578

579
  defp handle_auth_pkts(
580
         %{payload: {:authentication_server_first_message, server_first}},
581
         _,
582
         data
583
       ) do
584
    nonce = data.nonce
297✔
585
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
297✔
586

587
    {_method, secrets_fn} = data.auth.secrets
297✔
588
    secrets = secrets_fn.()
297✔
589

590
    {client_final_message, server_proof} =
297✔
591
      Helpers.get_client_final(
592
        data.auth.method,
297✔
593
        secrets,
594
        server_first_parts,
595
        nonce,
596
        secrets.user,
297✔
597
        "biws"
598
      )
599

600
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
297✔
601
    :ok = HandlerHelpers.sock_send(data.sock, bin)
297✔
602

603
    {:authentication_server_first_message, server_proof}
604
  end
605

606
  defp handle_auth_pkts(
607
         %{payload: {:authentication_server_final_message, server_final}},
608
         acc,
609
         _data
610
       ),
611
       do: Map.put(acc, :authentication_server_final_message, server_final)
295✔
612

613
  defp handle_auth_pkts(
614
         %{payload: :authentication_ok},
615
         acc,
616
         _data
617
       ),
618
       do: Map.put(acc, :authentication_ok, true)
295✔
619

620
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
621
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
2✔
622

623
    {_method, secrets_fn} = data.auth.secrets
2✔
624
    secrets = secrets_fn.()
2✔
625

626
    digest =
2✔
627
      if data.auth.method == :password do
2✔
628
        Helpers.md5([secrets.password, secrets.user])
1✔
629
      else
630
        secrets.password
1✔
631
      end
632

633
    payload = ["md5", Helpers.md5([digest, salt]), 0]
2✔
634
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
2✔
635
    :ok = HandlerHelpers.sock_send(data.sock, bin)
2✔
636
    :authentication_md5
637
  end
638

639
  defp handle_auth_pkts(%{payload: :authentication_cleartext_password} = dec_pkt, _, data) do
640
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
641

642
    {_method, secrets_fn} = data.auth.secrets
1✔
643
    secrets = secrets_fn.()
1✔
644

645
    payload = <<secrets.password::binary, 0>>
1✔
646
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
647
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
648
    :authentication_cleartext
649
  end
650

651
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
4✔
652
    do: {:error_response, error}
653

654
  defp handle_auth_pkts(_e, acc, _data), do: acc
1✔
655

656
  @spec handle_authentication_error(map(), String.t()) :: any()
NEW
657
  defp handle_authentication_error(%{mode: :proxy}, _reason), do: :ok
×
658

659
  defp handle_authentication_error(%{mode: _other} = data, _reason) do
660
    tenant = Supavisor.tenant(data.id)
1✔
661
    Supavisor.SecretCache.invalidate(tenant, data.user)
1✔
662
  end
663

664
  @spec reconnect_timeout(map()) :: pos_integer()
UNCOV
665
  def reconnect_timeout(%{proxy: true}),
×
666
    do: @reconnect_timeout_proxy
667

668
  def reconnect_timeout(_),
1✔
669
    do: @reconnect_timeout
670

671
  @spec handle_server_messages(binary(), map()) :: map()
672
  defp handle_server_messages(bin, data) do
673
    if FeatureFlag.enabled?(data.tenant_feature_flags, "named_prepared_statements") do
20,648✔
674
      {:ok, updated_data, packets_to_send} = process_backend_streaming(bin, data)
20,594✔
675

676
      if packets_to_send != [] do
20,594✔
677
        HandlerHelpers.sock_send(data.client_sock, packets_to_send)
19,780✔
678
      end
679

680
      updated_data
20,594✔
681
    else
682
      HandlerHelpers.sock_send(data.client_sock, bin)
54✔
683

684
      data
54✔
685
    end
686
  end
687

688
  # If the prepared statement exists for us, it exists for the server, so we just send the
689
  # bind to the socket. If it doesn't, we must send the parse pkt first.
690
  #
691
  # If we received a bind without a parse, we need to intercept the parse response, otherwise,
692
  # the client will receive an unexpected message.
693
  defp handle_prepared_statement_pkt({:bind_pkt, stmt_name, pkt, parse_pkt}, {iodata, data}) do
694
    if stmt_name in data.prepared_statements do
1,532✔
695
      {[pkt | iodata], data}
696
    else
697
      new_data = %{
365✔
698
        data
699
        | stream_state:
700
            MessageStreamer.update_state(data.stream_state, fn queue ->
365✔
701
              :queue.in({:intercept, :parse}, queue)
365✔
702
            end),
703
          prepared_statements: MapSet.put(data.prepared_statements, stmt_name)
365✔
704
      }
705

706
      {[[parse_pkt, pkt] | iodata], new_data}
707
    end
708
  end
709

710
  defp handle_prepared_statement_pkt({:close_pkt, stmt_name, pkt}, {iodata, data}) do
1,332✔
711
    {[pkt | iodata],
712
     %{
713
       data
714
       | prepared_statements: MapSet.delete(data.prepared_statements, stmt_name),
1,332✔
715
         stream_state:
716
           MessageStreamer.update_state(data.stream_state, fn queue ->
1,332✔
717
             :queue.in({:forward, :close}, queue)
1,332✔
718
           end)
719
     }}
720
  end
721

722
  defp handle_prepared_statement_pkt({:describe_pkt, _stmt_name, pkt}, {iodata, data}) do
1,329✔
723
    {[pkt | iodata], data}
724
  end
725

726
  # If we stop generating unique id per statement, and instead do deterministic ids,
727
  # we need to potentially drop parse pkts and return a parse response
728
  defp handle_prepared_statement_pkt({:parse_pkt, stmt_name, pkt}, {iodata, data}) do
729
    if stmt_name in data.prepared_statements do
1,329✔
730
      {iodata,
731
       %{
732
         data
733
         | stream_state:
734
             MessageStreamer.update_state(data.stream_state, fn queue ->
311✔
735
               :queue.in({:inject, :parse}, queue)
311✔
736
             end)
737
       }}
738
    else
739
      prepared_statements = MapSet.put(data.prepared_statements, stmt_name)
1,018✔
740

741
      {[pkt | iodata],
742
       %{
743
         data
744
         | prepared_statements: prepared_statements,
745
           stream_state:
746
             MessageStreamer.update_state(data.stream_state, fn queue ->
1,018✔
747
               :queue.in({:forward, :parse}, queue)
1,018✔
748
             end)
749
       }}
750
    end
751
  end
752

753
  defp evict_exceeding(%{prepared_statements: prepared_statements, id: id}) do
754
    limit = PreparedStatements.backend_limit()
2,884✔
755

756
    if MapSet.size(prepared_statements) >= limit do
2,884✔
757
      count = div(limit, 5)
13✔
758
      to_remove = Enum.take_random(prepared_statements, count) |> MapSet.new()
13✔
759
      close_pkts = Enum.map(to_remove, &PreparedStatements.build_close_pkt/1)
13✔
760
      prepared_statements = MapSet.difference(prepared_statements, to_remove)
13✔
761
      Telem.prepared_statements_evicted(count, id)
13✔
762

763
      {close_pkts, prepared_statements}
764
    else
765
      {[], prepared_statements}
766
    end
767
  end
768

769
  defp maybe_reconnect(reason, data) do
770
    max_reconnect_retries = Application.get_env(:supavisor, :reconnect_retries)
3✔
771
    data = %{data | reconnect_retries: data.reconnect_retries + 1}
3✔
772

773
    if data.reconnect_retries > max_reconnect_retries do
3✔
774
      {:stop, {:failed_to_connect, reason}}
775
    else
776
      {:keep_state, data, {:state_timeout, reconnect_timeout(data), :connect}}
1✔
777
    end
778
  end
779

780
  defp process_backend_streaming(bin, data) do
781
    case MessageStreamer.handle_packets(data.stream_state, bin) do
20,594✔
782
      {:ok, new_stream_state, packets} ->
783
        updated_data = %{data | stream_state: new_stream_state}
20,594✔
784
        {:ok, updated_data, packets}
20,594✔
785

786
      err ->
787
        err
×
788
    end
789
  end
790

791
  defp get_auth_with_secrets(auth, tenant, user) do
792
    case Supavisor.SecretCache.get_upstream_auth_secrets(tenant, user) do
305✔
793
      {:ok, upstream_auth_secrets} ->
794
        Map.put(auth, :secrets, upstream_auth_secrets)
305✔
795

796
      _other ->
NEW
797
        raise "Upstream connection secrets not found"
×
798
    end
799
  end
800

801
  defp handle_connection_failure(reason, data) do
802
    if not data.proxy do
2✔
803
      {_, tenant} = data.tenant
2✔
804
      Supavisor.CircuitBreaker.record_failure(tenant, :db_connection)
2✔
805
    end
806

807
    maybe_reconnect(reason, data)
2✔
808
  end
809
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc