• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19715527181

26 Nov 2025 07:41PM UTC coverage: 74.358% (+13.1%) from 61.246%
19715527181

Pull #744

github

web-flow
Merge 9e08c361d into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

771 of 916 new or added lines in 23 files covered. (84.17%)

3 existing lines in 2 files now uncovered.

2404 of 3233 relevant lines covered (74.36%)

4254.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.86
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a link with the database, send requests to the database, and handle incoming messages from clients.
4
  It uses the Supavisor.Protocol.Server module to decode messages from the database and sends messages to clients Supavisor.ClientHandler.
5
  """
6

7
  @behaviour :gen_statem
8

9
  require Logger
10
  require Supavisor.Protocol.Server, as: Server
11
  require Supavisor.Protocol.MessageStreamer, as: MessageStreamer
12

13
  alias Supavisor.Protocol.PreparedStatements
14

15
  alias Supavisor.{
16
    ClientHandler,
17
    FeatureFlag,
18
    HandlerHelpers,
19
    Helpers,
20
    Monitoring.Telem,
21
    Protocol.BackendMessageHandler,
22
    Protocol.Debug,
23
    Protocol.MessageStreamer,
24
    Protocol.Server
25
  }
26

27
  @type state :: :connect | :authentication | :idle | :busy
28

29
  @reconnect_timeout 2_500
30
  @reconnect_timeout_proxy 500
31
  @sock_closed [:tcp_closed, :ssl_closed]
32
  @proto [:tcp, :ssl]
33
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
34

35
  def start_link(config),
36
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
277✔
37

38
  def checkout(pid, sock, caller, timeout \\ 15_000),
4,724✔
39
    do: :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
4,724✔
40

41
  @spec checkin(pid()) :: :ok
42
  def checkin(pid), do: :gen_statem.cast(pid, :checkin)
43

44
  @spec handle_prepared_statement_pkts(pid, [PreparedStatements.handled_pkt()]) :: :ok
45
  def handle_prepared_statement_pkts(pid, pkts) do
46
    :gen_statem.call(pid, {:handle_ps_pkts, pkts}, 15_000)
2,876✔
47
  end
48

38✔
49
  @spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
50
  def get_state_and_mode(pid) do
244✔
51
    {:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
52
  catch
53
    error, reason -> {:error, {error, reason}}
1✔
54
  end
55

56
  @spec stop(pid()) :: :ok
57
  def stop(pid) do
58
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
231✔
59
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
231✔
60
  end
61

63✔
62
  @impl true
63
  def init(args) do
3✔
64
    Process.flag(:trap_exit, true)
277✔
65
    Helpers.set_log_level(args.log_level)
277✔
66
    Helpers.set_max_heap_size(90)
277✔
67

68
    {_, tenant} = args.tenant
277✔
69
    Logger.metadata(project: tenant, user: args.user, mode: args.mode)
277✔
70

71
    data =
277✔
72
      %{
73
        id: args.id,
277✔
74
        sock: nil,
75
        sent: false,
76
        auth: args.auth,
277✔
77
        user: args.user,
277✔
78
        tenant: args.tenant,
277✔
79
        tenant_feature_flags: args.tenant_feature_flags,
277✔
80
        db_state: nil,
81
        parameter_status: %{},
82
        nonce: nil,
7✔
83
        messages: "",
84
        server_proof: nil,
85
        stats: %{},
86
        client_stats: %{},
87
        prepared_statements: MapSet.new(),
88
        stream_state: MessageStreamer.new_stream_state(BackendMessageHandler),
89
        mode: args.mode,
277✔
90
        replica_type: args.replica_type,
277✔
91
        caller: args[:caller] || nil,
277✔
92
        client_sock: args[:client_sock] || nil,
277✔
93
        proxy: args[:proxy] || false,
277✔
94
        active_count: 0,
95
        reconnect_retries: 0
96
      }
97

98
    Telem.handler_action(:db_handler, :started, args.id)
277✔
99
    {:ok, :connect, data, {:next_event, :internal, :connect}}
277✔
100
  end
101

102
  @impl true
103
  def callback_mode, do: [:handle_event_function]
277✔
104

105
  @impl true
106
  def handle_event(:internal, _, :connect, %{auth: auth} = data) do
39✔
107
    Logger.debug("DbHandler: Try to connect to DB")
277✔
108

39✔
109
    sock_opts =
277✔
110
      [
111
        auth.ip_version,
277✔
112
        mode: :binary,
113
        packet: :raw,
114
        # recbuf: 8192,
39✔
115
        # sndbuf: 8192,
39✔
116
        # backlog: 2048,
117
        # send_timeout: 120,
118
        # keepalive: true,
39✔
119
        # nopush: true,
39✔
120
        nodelay: true,
121
        active: false
39✔
122
      ]
39✔
123

124
    Telem.handler_action(:db_handler, :db_connection, data.id)
316✔
125

126
    case :gen_tcp.connect(auth.host, auth.port, sock_opts) do
277✔
127
      {:ok, sock} ->
128
        Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}")
277✔
129

130
        case try_ssl_handshake({:gen_tcp, sock}, auth) do
316✔
131
          {:ok, sock} ->
132
            tenant = if data.proxy, do: Supavisor.tenant(data.id)
277✔
133
            search_path = Supavisor.search_path(data.id)
316✔
134

135
            case send_startup(sock, auth, tenant, search_path) do
277✔
136
              :ok ->
137
                :ok = activate(sock)
277✔
138
                {:next_state, :authentication, %{data | sock: sock}}
316✔
139

39✔
140
              {:error, reason} ->
39✔
141
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
142
                maybe_reconnect(reason, data)
143
            end
144

145
          {:error, reason} ->
146
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
147
            maybe_reconnect(reason, data)
148
        end
149

150
      other ->
39✔
151
        Logger.error(
39✔
152
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}"
39✔
153
        )
39✔
154

155
        maybe_reconnect(other, data)
156
    end
157
  end
39✔
158

39✔
159
  def handle_event(:state_timeout, :connect, _state, data) do
160
    retry = data.reconnect_retries
161
    Logger.warning("DbHandler: Reconnect #{retry} to DB")
162

38✔
163
    {:keep_state, %{data | reconnect_retries: retry + 1}, {:next_event, :internal, :connect}}
164
  end
165

166
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
42✔
167
    {:ok, dec_pkt, _} = Server.decode(bin)
820✔
168
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
862✔
169

42✔
170
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
820✔
171

172
    case resp do
820✔
173
      {:authentication_sasl, nonce} ->
275✔
174
        {:keep_state, %{data | nonce: nonce}}
175

176
      {:authentication_server_first_message, server_proof} ->
315✔
177
        {:keep_state, %{data | server_proof: server_proof}}
178

42✔
179
      %{authentication_server_final_message: _server_final} ->
180
        :keep_state_and_data
181

182
      %{authentication_ok: true} ->
40✔
183
        :keep_state_and_data
40✔
184

185
      :authentication_md5 ->
40✔
186
        {:keep_state, data}
187

40✔
188
      :authentication_cleartext ->
189
        {:keep_state, data}
39✔
190

39✔
191
      {:error_response, ["SFATAL", "VFATAL", "C28P01", reason, _, _, _]} ->
192
        handle_authentication_error(data, reason)
39✔
193
        Logger.error("DbHandler: Auth error #{inspect(reason)}")
194
        {:stop, :invalid_password, data}
39✔
195

39✔
196
      {:error_response, error} ->
197
        Logger.error("DbHandler: Error auth response #{inspect(error)}")
198
        {:stop, {:encode_and_forward, error}}
199

200
      {:ready_for_query, acc} ->
201
        ps = acc.ps
272✔
202

203
        Logger.debug(
273✔
204
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
273✔
205
        )
206

207
        if data.proxy do
272✔
208
          bin_ps = Server.encode_parameter_status(ps)
2✔
209
          send(data.caller, {:parameter_status, bin_ps})
2✔
210
        else
211
          Supavisor.set_parameter_status(data.id, ps)
272✔
212
        end
2✔
213

214
        {:next_state, :idle, %{data | parameter_status: ps, reconnect_retries: 0}}
272✔
215

216
      other ->
217
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
×
218
        {:stop, :auth_error, data}
×
219
    end
220
  end
221

222
  # the process received message from db without linked caller
223
  def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do
224
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when caller was nil")
115✔
225
    :keep_state_and_data
115✔
226
  end
227

115✔
228
  def handle_event(:info, {proto, _, bin}, _, %{replica_type: :read} = data)
229
      when proto in @proto do
115✔
230
    Logger.debug("DbHandler: Got read replica message #{inspect(bin)}")
38✔
231

232
    # TODO: use streaming for read replica too
233
    {:ok, pkts, rest} = Server.decode(<<data.pending_bin::binary, bin::binary>>)
37✔
234
    data = %{data | pending_bin: rest}
235

236
    resp =
×
237
      cond do
238
        Server.has_read_only_error?(pkts) ->
239
          Logger.error("DbHandler: read only error")
×
240

241
          with [_] <- pkts do
242
            # need to flush ready_for_query if it's not in same packet
2✔
243
            :ok = receive_ready_for_query()
244
          end
245

1✔
246
          :read_sql_error
247

248
        List.last(pkts).tag == :ready_for_query ->
249
          :ready_for_query
1✔
250

1✔
251
        true ->
1✔
252
          :continue
1✔
253
      end
254

255
    if resp != :continue do
2✔
256
      HandlerHelpers.sock_send(data.client_sock, bin)
2✔
257
      :ok = ClientHandler.db_status(data.caller, resp)
258
      {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
2✔
259
      {:keep_state, %{data | stats: stats, caller: handler_caller(data)}}
2✔
260
    else
261
      {:keep_state, data}
262
    end
2✔
263
  end
264

265
  # forward the message to the client
266
  def handle_event(:info, {proto, _, bin}, _, %{caller: caller} = data)
267
      when is_pid(caller) and proto in @proto do
268
    Logger.debug(
20,437✔
269
      "DbHandler: Got write replica messages: #{Debug.packet_to_string(bin, :backend)}"
20,437✔
270
    )
271

272
    if String.ends_with?(bin, Server.ready_for_query()) do
20,437✔
273
      HandlerHelpers.activate(data.sock)
6,271✔
274

275
      {_, stats} =
6,272✔
276
        if data.proxy,
6,272✔
277
          do: {nil, data.stats},
278
          else: Telem.network_usage(:db, data.sock, data.id, data.stats)
6,271✔
279

280
      # in transaction mode, we need to notify the client when the transaction is finished,
32✔
281
      # after which it will unlink the direct db connection process from itself.
282
      data =
6,303✔
283
        if data.mode == :transaction do
6,303✔
284
          ClientHandler.db_status(data.caller, :ready_for_query)
4,480✔
285
          data = handle_server_messages(bin, data)
4,480✔
286
          %{data | stats: stats, caller: nil, client_sock: nil, active_count: 0}
4,512✔
287
        else
288
          data = handle_server_messages(bin, data)
1,791✔
289

290
          {_, client_stats} =
1,823✔
291
            if data.proxy,
1,791✔
292
              do: {nil, data.client_stats},
293
              else: Telem.network_usage(:client, data.client_sock, data.id, data.client_stats)
1,823✔
294

295
          %{data | stats: stats, active_count: 0, client_stats: client_stats}
1,791✔
296
        end
1✔
297

1✔
298
      {:next_state, :idle, data}
6,271✔
299
    else
300
      if data.active_count > @switch_active_count,
14,166✔
301
        do: HandlerHelpers.active_once(data.sock)
7,721✔
302

303
      data = handle_server_messages(bin, data)
14,167✔
304
      {:keep_state, %{data | active_count: data.active_count + 1}}
14,166✔
305
    end
306
  end
307

308
  def handle_event({:call, from}, {:handle_ps_pkts, pkts}, _state, data) do
309
    {iodata, data} = Enum.reduce(pkts, {[], data}, &handle_prepared_statement_pkt/2)
2,876✔
310

119✔
311
    {close_pkts, prepared_statements} = evict_exceeding(data)
2,876✔
312

119✔
313
    :ok = HandlerHelpers.sock_send(data.sock, Enum.reverse([close_pkts | iodata]))
2,933✔
314

315
    data = %{
2,933✔
316
      data
57✔
317
      | stream_state:
318
          Enum.reduce(close_pkts, data.stream_state, fn _, stream_state ->
2,933✔
319
            MessageStreamer.update_state(stream_state, fn queue ->
560✔
320
              :queue.in({:intercept, :close}, queue)
560✔
321
            end)
322
          end),
57✔
323
        prepared_statements: prepared_statements
56✔
324
    }
56✔
325

326
    {:keep_state, data, {:reply, from, :ok}}
2,932✔
327
  end
328

1✔
329
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
330
    Logger.debug("DbHandler: checkout call when state was #{state}")
4,981✔
331

1✔
332
    if state in [:idle, :busy] do
4,980✔
333
      {:keep_state, %{data | client_sock: sock, caller: caller}, {:reply, from, data.sock}}
4,725✔
334
    else
335
      {:keep_state_and_data, :postpone}
336
    end
337
  end
338

62✔
339
  def handle_event({:call, from}, :ps, _, data) do
340
    Logger.debug("DbHandler: get parameter status")
341
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
342
  end
343

344
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
7✔
345
    {:stop, {:shutdown, :db_termination}, data}
346
  end
7✔
347

348
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
7✔
349
    Logger.error("DbHandler: Connection closed when state was #{state}")
1✔
350

7✔
351
    if Application.get_env(:supavisor, :reconnect_on_db_close),
1✔
352
      do: {:next_state, :connect, data, {:state_timeout, reconnect_timeout(data), :connect}},
353
      else: {:stop, {:shutdown, :db_termination}, data}
8✔
354
  end
355

356
  # linked client_handler went down
357
  def handle_event(_, {:EXIT, pid, reason}, _state, data) do
358
    if reason != :normal do
12✔
359
      Logger.error(
12✔
360
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
361
      )
7✔
362
    end
363

364
    HandlerHelpers.sock_send(data.sock, Server.terminate_message())
12✔
365
    HandlerHelpers.sock_close(data.sock)
73✔
366
    {:stop, {:client_handler_down, data.mode}}
12✔
367
  end
61✔
368

59✔
369
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
302✔
370
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
243✔
371
  end
372

373
  def handle_event(type, content, state, data) do
374
    msg = [
375
      {"type", type},
376
      {"content", content},
377
      {"state", state},
378
      {"data", data}
379
    ]
380

381
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
382

383
    :keep_state_and_data
384
  end
385

1✔
386
  @impl true
387
  def terminate(:shutdown, _state, data) do
1✔
388
    Telem.handler_action(:db_handler, :stopped, data.id)
23✔
389
    :ok
1✔
390
  end
391

392
  def terminate(reason, state, data) do
393
    Telem.handler_action(:db_handler, :stopped, data.id)
244✔
394

3✔
395
    if data.client_sock != nil do
244✔
396
      message =
244✔
397
        case reason do
398
          {:encode_and_forward, msg} -> Server.encode_error_message(msg)
399
          _ -> Server.error_message("XX000", inspect(reason))
244✔
400
        end
3✔
401

3✔
402
      HandlerHelpers.sock_send(data.client_sock, message)
244✔
403
    end
404

405
    Logger.error(
244✔
406
      "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
407
    )
408
  end
409

410
  @spec try_ssl_handshake(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.sock()} | {:error, term()}
411
  defp try_ssl_handshake(sock, %{upstream_ssl: true} = auth) do
412
    case HandlerHelpers.sock_send(sock, Server.ssl_request()) do
413
      :ok -> ssl_recv(sock, auth)
414
      error -> error
415
    end
416
  end
417

418
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
277✔
419

420
  @spec ssl_recv(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.ssl_sock()} | {:error, term}
421
  defp ssl_recv({:gen_tcp, sock} = s, auth) do
422
    case :gen_tcp.recv(sock, 1, 15_000) do
×
423
      {:ok, <<?S>>} -> ssl_connect(s, auth)
424
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
425
      {:error, _} = error -> error
426
    end
427
  end
428

429
  @spec ssl_connect(Supavisor.tcp_sock(), map, pos_integer) ::
22✔
430
          {:ok, Supavisor.ssl_sock()} | {:error, term}
431
  defp ssl_connect({:gen_tcp, sock}, auth, timeout \\ 5000) do
432
    opts =
433
      case auth.upstream_verify do
434
        :peer ->
6✔
435
          [
436
            verify: :verify_peer,
6✔
437
            cacerts: [auth.upstream_tls_ca],
5✔
438
            # unclear behavior on pg14
439
            server_name_indication: auth.sni_hostname || auth.host,
NEW
440
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
441
          ]
442

443
        :none ->
444
          [verify: :verify_none]
1✔
445
      end
446

447
    case :ssl.connect(sock, opts, timeout) do
448
      {:ok, ssl_sock} ->
449
        {:ok, {:ssl, ssl_sock}}
450

451
      {:error, reason} ->
452
        {:error, reason}
3✔
453
    end
454
  end
1✔
455

456
  @spec send_startup(Supavisor.sock(), map(), String.t() | nil, String.t() | nil) ::
457
          :ok | {:error, term}
458
  def send_startup(sock, auth, tenant, search_path) do
459
    user =
279✔
460
      if is_nil(tenant), do: get_user(auth), else: "#{get_user(auth)}.#{tenant}"
277✔
461

462
    msg =
277✔
463
      :pgo_protocol.encode_startup_message(
464
        [
465
          {"user", user},
466
          {"database", auth.database},
278✔
467
          {"application_name", auth.application_name}
278✔
468
        ] ++ if(search_path, do: [{"options", "--search_path=#{search_path}"}], else: [])
277✔
469
      )
470

471
    HandlerHelpers.sock_send(sock, msg)
277✔
472
  end
39✔
473

474
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
475
  defp activate({:gen_tcp, sock}) do
476
    :inet.setopts(sock, active: true)
278✔
477
  end
478

1✔
479
  defp activate({:ssl, sock}) do
480
    :ssl.setopts(sock, active: true)
481
  end
482

483
  defp get_user(auth) do
484
    if auth.require_user do
552✔
485
      auth.secrets.().db_user
32✔
486
    else
487
      auth.secrets.().user
520✔
488
    end
489
  end
490

491
  @spec receive_ready_for_query() :: :ok | :timeout_error
492
  defp receive_ready_for_query do
493
    receive do
×
494
      {_proto, _socket, <<?Z, 5::32, ?I>>} ->
×
495
        :ok
496
    after
497
      15_000 -> :timeout_error
×
498
    end
499
  end
500

501
  @spec handler_caller(map()) :: pid() | nil
502
  defp handler_caller(%{mode: :session} = data), do: data.caller
×
503
  defp handler_caller(_), do: nil
504

505
  @spec check_ready(binary()) ::
506
          {:ready_for_query, :idle | :transaction_block | :failed_transaction_block} | :continue
507
  def check_ready(bin) do
508
    bin_size = byte_size(bin)
509

510
    case bin do
511
      <<_::binary-size(bin_size - 6), 90, 0, 0, 0, 5, status_indicator::binary>> ->
512
        indicator =
513
          case status_indicator do
39✔
514
            <<?I>> -> :idle
39✔
515
            <<?T>> -> :transaction_block
516
            <<?E>> -> :failed_transaction_block
39✔
517
            _ -> :continue
518
          end
519

520
        {:ready_for_query, indicator}
39✔
521

39✔
522
      _ ->
39✔
523
        :continue
524
    end
525
  end
39✔
526

527
  @spec handle_auth_pkts(map(), map(), map()) :: any()
528
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
529
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
3,536✔
530

39✔
531
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
272✔
532
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
533

534
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
535
    key = self()
272✔
536
    conn = %{host: data.auth.host, port: data.auth.port, ip_ver: data.auth.ip_version}
272✔
537
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
272✔
538
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
348✔
539
    Map.put(acc, :backend_key_data, payload)
348✔
540
  end
76✔
541

542
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
543
    nonce =
275✔
544
      case Server.decode_string(methods_b) do
545
        {:ok, req_method, _} ->
416✔
546
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
275✔
547
          nonce = :pgo_scram.get_nonce(16)
307✔
548
          user = get_user(data.auth)
275✔
549
          client_first = :pgo_scram.get_client_first(user, nonce)
275✔
550
          client_first_size = IO.iodata_length(client_first)
275✔
551

32✔
552
          sasl_initial_response = [
307✔
553
            "SCRAM-SHA-256",
554
            0,
555
            <<client_first_size::32-integer>>,
32✔
556
            client_first
32✔
557
          ]
32✔
558

32✔
559
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
307✔
560
          :ok = HandlerHelpers.sock_send(data.sock, bin)
275✔
561
          nonce
275✔
562

563
        other ->
38✔
564
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
565
          nil
566
      end
37✔
567

37✔
568
    {:authentication_sasl, nonce}
37✔
569
  end
37✔
570

37✔
571
  defp handle_auth_pkts(
572
         %{payload: {:authentication_server_first_message, server_first}},
37✔
573
         _,
574
         data
575
       )
576
       when data.auth.require_user == false do
577
    nonce = data.nonce
257✔
578
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
257✔
579

37✔
580
    {client_final_message, server_proof} =
294✔
581
      Helpers.get_client_final(
37✔
582
        :auth_query,
583
        data.auth.secrets.(),
257✔
584
        server_first_parts,
1✔
585
        nonce,
586
        data.auth.secrets.().user,
257✔
587
        "biws"
588
      )
589

590
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
257✔
591
    :ok = HandlerHelpers.sock_send(data.sock, bin)
257✔
592

593
    {:authentication_server_first_message, server_proof}
594
  end
595

596
  defp handle_auth_pkts(
37✔
597
         %{payload: {:authentication_server_first_message, server_first}},
37✔
598
         _,
599
         data
37✔
600
       ) do
37✔
601
    nonce = data.nonce
16✔
602
    server_first_parts = :pgo_scram.parse_server_first(server_first, nonce)
53✔
603

604
    {client_final_message, server_proof} =
53✔
605
      :pgo_scram.get_client_final(
606
        server_first_parts,
607
        nonce,
608
        data.auth.user,
53✔
609
        data.auth.secrets.().password
16✔
610
      )
611

612
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
53✔
613
    :ok = HandlerHelpers.sock_send(data.sock, bin)
53✔
614

615
    {:authentication_server_first_message, server_proof}
616
  end
617

618
  defp handle_auth_pkts(
619
         %{payload: {:authentication_server_final_message, server_final}},
620
         acc,
621
         _data
622
       ),
623
       do: Map.put(acc, :authentication_server_final_message, server_final)
306✔
624

625
  defp handle_auth_pkts(
626
         %{payload: :authentication_ok},
627
         acc,
628
         _data
629
       ),
630
       do: Map.put(acc, :authentication_ok, true)
306✔
631

632
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
633
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
2✔
634

635
    digest =
2✔
636
      if data.auth.method == :password do
2✔
637
        Helpers.md5([data.auth.password.(), data.auth.user])
638
      else
2✔
639
        data.auth.secrets.().secret
2✔
640
      end
1✔
641

642
    payload = ["md5", Helpers.md5([digest, salt]), 0]
1✔
643
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
644
    :ok = HandlerHelpers.sock_send(data.sock, bin)
645
    :authentication_md5
2✔
646
  end
2✔
647

2✔
648
  defp handle_auth_pkts(%{payload: :authentication_cleartext_password} = dec_pkt, _, data) do
649
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
650

651
    payload = <<data.auth.password.()::binary, 0>>
652
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
653
    :ok = HandlerHelpers.sock_send(data.sock, bin)
654
    :authentication_cleartext
1✔
655
  end
1✔
656

657
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
1✔
658
    do: {:error_response, error}
1✔
659

1✔
660
  defp handle_auth_pkts(_e, acc, _data), do: acc
661

662
  @spec handle_authentication_error(map(), String.t()) :: any()
663
  defp handle_authentication_error(%{proxy: false} = data, reason) do
4✔
664
    tenant = Supavisor.tenant(data.id)
665

666
    :erpc.multicast([node() | Node.list()], fn ->
1✔
667
      Cachex.del(Supavisor.Cache, {:secrets, tenant, data.user})
668
      Cachex.del(Supavisor.Cache, {:secrets_check, tenant, data.user})
669

670
      Registry.dispatch(Supavisor.Registry.TenantClients, data.id, fn entries ->
671
        for {client_handler, _meta} <- entries,
672
            do: send(client_handler, {:disconnect, reason})
1✔
673
      end)
1✔
674
    end)
675

676
    Supavisor.stop(data.id)
677
  end
678

679
  defp handle_authentication_error(%{proxy: true}, _reason), do: :ok
680

1✔
681
  @spec reconnect_timeout(map()) :: pos_integer()
682
  def reconnect_timeout(%{proxy: true}),
683
    do: @reconnect_timeout_proxy
684

685
  def reconnect_timeout(_),
119✔
686
    do: @reconnect_timeout
51✔
687

688
  @spec handle_server_messages(binary(), map()) :: map()
51✔
689
  defp handle_server_messages(bin, data) do
51✔
690
    if FeatureFlag.enabled?(data.tenant_feature_flags, "named_prepared_statements") do
20,437✔
691
      {:ok, updated_data, packets_to_send} = process_backend_streaming(bin, data)
20,437✔
692

51✔
693
      if packets_to_send != [] do
20,437✔
694
        HandlerHelpers.sock_send(data.client_sock, packets_to_send)
19,688✔
695
      end
696

68✔
697
      updated_data
20,437✔
698
    else
699
      HandlerHelpers.sock_send(data.client_sock, bin)
700

701
      data
702
    end
703
  end
704

705
  # If the prepared statement exists for us, it exists for the server, so we just send the
706
  # bind to the socket. If it doesn't, we must send the parse pkt first.
707
  #
708
  # If we received a bind without a parse, we need to intercept the parse response, otherwise,
709
  # the client will receive an unexpected message.
710
  defp handle_prepared_statement_pkt({:bind_pkt, stmt_name, pkt, parse_pkt}, {iodata, data}) do
711
    if stmt_name in data.prepared_statements do
1,532✔
712
      {[pkt | iodata], data}
713
    else
714
      new_data = %{
355✔
715
        data
716
        | stream_state:
717
            MessageStreamer.update_state(data.stream_state, fn queue ->
355✔
718
              :queue.in({:intercept, :parse}, queue)
355✔
719
            end),
720
          prepared_statements: MapSet.put(data.prepared_statements, stmt_name)
355✔
721
      }
722

7✔
723
      {[[parse_pkt, pkt] | iodata], new_data}
724
    end
725
  end
726

7✔
727
  defp handle_prepared_statement_pkt({:close_pkt, stmt_name, pkt}, {iodata, data}) do
1,325✔
728
    {[pkt | iodata],
7✔
729
     %{
7✔
730
       data
731
       | prepared_statements: MapSet.delete(data.prepared_statements, stmt_name),
1,325✔
732
         stream_state:
733
           MessageStreamer.update_state(data.stream_state, fn queue ->
1,325✔
734
             :queue.in({:forward, :close}, queue)
1,325✔
735
           end)
736
     }}
737
  end
738

739
  defp handle_prepared_statement_pkt({:describe_pkt, _stmt_name, pkt}, {iodata, data}) do
1,328✔
740
    {[pkt | iodata], data}
741
  end
742

743
  # If we stop generating unique id per statement, and instead do deterministic ids,
744
  # we need to potentially drop parse pkts and return a parse response
745
  defp handle_prepared_statement_pkt({:parse_pkt, stmt_name, pkt}, {iodata, data}) do
746
    if stmt_name in data.prepared_statements do
1,328✔
747
      {iodata,
748
       %{
749
         data
750
         | stream_state:
751
             MessageStreamer.update_state(data.stream_state, fn queue ->
293✔
752
               :queue.in({:inject, :parse}, queue)
293✔
753
             end)
754
       }}
755
    else
756
      prepared_statements = MapSet.put(data.prepared_statements, stmt_name)
1,035✔
757

758
      {[pkt | iodata],
759
       %{
760
         data
761
         | prepared_statements: prepared_statements,
762
           stream_state:
763
             MessageStreamer.update_state(data.stream_state, fn queue ->
1,035✔
764
               :queue.in({:forward, :parse}, queue)
1,035✔
765
             end)
766
       }}
7✔
767
    end
768
  end
7✔
769

770
  defp evict_exceeding(%{prepared_statements: prepared_statements, id: id}) do
771
    limit = PreparedStatements.backend_limit()
2,876✔
772

773
    if MapSet.size(prepared_statements) >= limit do
2,876✔
774
      count = div(limit, 5)
14✔
775
      to_remove = Enum.take_random(prepared_statements, count) |> MapSet.new()
14✔
776
      close_pkts = Enum.map(to_remove, &PreparedStatements.build_close_pkt/1)
14✔
777
      prepared_statements = MapSet.difference(prepared_statements, to_remove)
14✔
778
      Telem.prepared_statements_evicted(count, id)
14✔
779

780
      {close_pkts, prepared_statements}
781
    else
782
      {[], prepared_statements}
3✔
783
    end
3✔
784
  end
785

3✔
786
  defp maybe_reconnect(reason, data) do
787
    max_reconnect_retries = Application.get_env(:supavisor, :reconnect_retries)
788

1✔
789
    if data.reconnect_retries > max_reconnect_retries and data.client_sock != nil do
790
      {:stop, {:failed_to_connect, reason}}
791
    else
792
      {:keep_state_and_data, {:state_timeout, reconnect_timeout(data), :connect}}
793
    end
51✔
794
  end
795

51✔
796
  defp process_backend_streaming(bin, data) do
51✔
797
    case MessageStreamer.handle_packets(data.stream_state, bin) do
20,437✔
798
      {:ok, new_stream_state, packets} ->
799
        updated_data = %{data | stream_state: new_stream_state}
20,437✔
800
        {:ok, updated_data, packets}
20,437✔
801

802
      err ->
803
        err
804
    end
39✔
805
  end
806
end
39✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc