• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 16470769502

23 Jul 2025 12:34PM UTC coverage: 57.067% (+1.7%) from 55.355%
16470769502

Pull #694

github

web-flow
Merge 78a9c0b2c into deaa48192
Pull Request #694: feat: improved named prepared statements support

175 of 217 new or added lines in 11 files covered. (80.65%)

16 existing lines in 4 files now uncovered.

1292 of 2264 relevant lines covered (57.07%)

1126.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.52
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a link with the database, send requests to the database, and handle incoming messages from clients.
4
  It uses the Supavisor.Protocol.Server module to decode messages from the database and sends messages to clients Supavisor.ClientHandler.
5
  """
6

7
  @behaviour :gen_statem
8

9
  require Logger
10
  require Supavisor.Protocol.Server, as: Server
11

12
  alias Supavisor.Protocol.PreparedStatements
13

14
  alias Supavisor.{
15
    ClientHandler,
16
    HandlerHelpers,
17
    Helpers,
18
    Monitoring.Telem,
19
    Protocol.Server
20
  }
21

22
  @type state :: :connect | :authentication | :idle | :busy
23

24
  @reconnect_timeout 2_500
25
  @reconnect_timeout_proxy 500
26
  @sock_closed [:tcp_closed, :ssl_closed]
27
  @proto [:tcp, :ssl]
28
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
29

30
  def start_link(config),
31
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
138✔
32

33
  def checkout(pid, sock, caller, timeout \\ 15_000),
3,524✔
34
    do: :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
3,524✔
35

36
  @spec checkin(pid()) :: :ok
37
  def checkin(pid), do: :gen_statem.cast(pid, :checkin)
×
38

39
  @spec handle_prepared_statement_pkts(pid, [PreparedStatements.handled_pkt()]) :: :ok
40
  def handle_prepared_statement_pkts(pid, pkts) do
41
    :gen_statem.call(pid, {:handle_ps_pkts, pkts}, 15_000)
3,583✔
42
  end
43

44
  @spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
45
  def get_state_and_mode(pid) do
99✔
46
    {:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
47
  catch
48
    error, reason -> {:error, {error, reason}}
×
49
  end
50

51
  @spec stop(pid()) :: :ok
52
  def stop(pid) do
53
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
94✔
54
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
94✔
55
  end
56

57
  @impl true
58
  def init(args) do
59
    Process.flag(:trap_exit, true)
139✔
60
    Helpers.set_log_level(args.log_level)
139✔
61
    Helpers.set_max_heap_size(90)
139✔
62

63
    {_, tenant} = args.tenant
139✔
64
    Logger.metadata(project: tenant, user: args.user, mode: args.mode)
139✔
65

66
    data =
139✔
67
      %{
68
        id: args.id,
139✔
69
        sock: nil,
70
        sent: false,
71
        auth: args.auth,
139✔
72
        user: args.user,
139✔
73
        tenant: args.tenant,
139✔
74
        buffer: [],
75
        anon_buffer: [],
76
        db_state: nil,
77
        parameter_status: %{},
78
        nonce: nil,
79
        messages: "",
80
        server_proof: nil,
81
        stats: %{},
82
        client_stats: %{},
83
        prepared_statements: MapSet.new(),
84
        pending_bin: "",
85
        action_queue: :queue.new(),
86
        mode: args.mode,
139✔
87
        replica_type: args.replica_type,
139✔
88
        reply: nil,
89
        caller: args[:caller] || nil,
139✔
90
        client_sock: args[:client_sock] || nil,
139✔
91
        proxy: args[:proxy] || false,
139✔
92
        active_count: 0,
93
        reconnect_retries: 0
94
      }
95

96
    Telem.handler_action(:db_handler, :started, args.id)
139✔
97
    {:ok, :connect, data, {:next_event, :internal, :connect}}
139✔
98
  end
99

100
  @impl true
101
  def callback_mode, do: [:handle_event_function]
138✔
102

103
  @impl true
104
  def handle_event(:internal, _, :connect, %{auth: auth} = data) do
105
    Logger.debug("DbHandler: Try to connect to DB")
140✔
106

107
    sock_opts =
140✔
108
      [
109
        auth.ip_version,
140✔
110
        mode: :binary,
111
        packet: :raw,
112
        # recbuf: 8192,
113
        # sndbuf: 8192,
114
        # backlog: 2048,
115
        # send_timeout: 120,
116
        # keepalive: true,
117
        # nopush: true,
118
        nodelay: true,
119
        active: false
120
      ]
121

122
    Telem.handler_action(:db_handler, :db_connection, data.id)
140✔
123

124
    case :gen_tcp.connect(auth.host, auth.port, sock_opts) do
140✔
125
      {:ok, sock} ->
126
        Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}")
139✔
127

128
        case try_ssl_handshake({:gen_tcp, sock}, auth) do
139✔
129
          {:ok, sock} ->
130
            tenant = if data.proxy, do: Supavisor.tenant(data.id)
139✔
131
            search_path = Supavisor.search_path(data.id)
139✔
132

133
            case send_startup(sock, auth, tenant, search_path) do
139✔
134
              :ok ->
135
                :ok = activate(sock)
138✔
136
                {:next_state, :authentication, %{data | sock: sock}}
138✔
137

138
              {:error, reason} ->
139
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
×
140
                maybe_reconnect(reason, data)
×
141
            end
142

143
          {:error, reason} ->
144
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
×
145
            maybe_reconnect(reason, data)
×
146
        end
147

148
      other ->
149
        Logger.error(
1✔
150
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}"
1✔
151
        )
152

153
        maybe_reconnect(other, data)
1✔
154
    end
155
  end
156

157
  def handle_event(:state_timeout, :connect, _state, data) do
158
    retry = data.reconnect_retries
×
159
    Logger.warning("DbHandler: Reconnect #{retry} to DB")
×
160

161
    {:keep_state, %{data | reconnect_retries: retry + 1}, {:next_event, :internal, :connect}}
×
162
  end
163

164
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
165
    {:ok, dec_pkt, _} = Server.decode(bin)
412✔
166
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
412✔
167

168
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
412✔
169

170
    case resp do
412✔
171
      {:authentication_sasl, nonce} ->
137✔
172
        {:keep_state, %{data | nonce: nonce}}
173

174
      {:authentication_server_first_message, server_proof} ->
137✔
175
        {:keep_state, %{data | server_proof: server_proof}}
176

177
      %{authentication_server_final_message: _server_final} ->
×
178
        :keep_state_and_data
179

180
      %{authentication_ok: true} ->
×
181
        :keep_state_and_data
182

183
      :authentication ->
×
184
        :keep_state_and_data
185

186
      :authentication_md5 ->
1✔
187
        {:keep_state, data}
188

189
      :authentication_cleartext ->
1✔
190
        {:keep_state, data}
191

192
      {:error_response, ["SFATAL", "VFATAL", "C28P01", reason, _, _, _]} ->
193
        handle_authentication_error(data, reason)
×
194
        Logger.error("DbHandler: Auth error #{inspect(reason)}")
×
195
        {:stop, :invalid_password, data}
×
196

197
      {:error_response, error} ->
198
        Logger.error("DbHandler: Error auth response #{inspect(error)}")
×
199
        {:stop, {:encode_and_forward, error}}
200

201
      {:ready_for_query, acc} ->
202
        ps = acc.ps
136✔
203

204
        Logger.debug(
136✔
205
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
136✔
206
        )
207

208
        if data.proxy do
136✔
209
          bin_ps = Server.encode_parameter_status(ps)
×
210
          send(data.caller, {:parameter_status, bin_ps})
×
211
        else
212
          Supavisor.set_parameter_status(data.id, ps)
136✔
213
        end
214

215
        {:next_state, :idle, %{data | parameter_status: ps, reconnect_retries: 0},
136✔
216
         {:next_event, :internal, :check_buffer}}
217

218
      other ->
219
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
×
220
        {:stop, :auth_error, data}
×
221
    end
222
  end
223

224
  def handle_event(:internal, :check_buffer, :idle, %{reply: from} = data) when from != nil do
225
    Logger.debug("DbHandler: Check buffer")
110✔
226
    {:next_state, :busy, %{data | reply: nil}, {:reply, from, data.sock}}
110✔
227
  end
228

229
  def handle_event(:internal, :check_buffer, :idle, %{buffer: buff, caller: caller} = data)
230
      when is_pid(caller) do
231
    if buff != [] do
×
232
      Logger.debug("DbHandler: Buffer is not empty, try to send #{IO.iodata_length(buff)} bytes")
×
233
      buff = Enum.reverse(buff)
×
NEW
234
      :ok = HandlerHelpers.sock_send(data.sock, buff)
×
235
    end
236

237
    {:next_state, :busy, %{data | buffer: []}}
×
238
  end
239

240
  # check if it needs to apply queries from the anon buffer
241
  def handle_event(:internal, :check_anon_buffer, _, %{anon_buffer: buff, caller: nil} = data) do
242
    Logger.debug("DbHandler: Check anon buffer")
3,425✔
243

244
    if buff != [] do
3,425✔
245
      Logger.debug(
×
246
        "DbHandler: Anon buffer is not empty, try to send #{IO.iodata_length(buff)} bytes"
×
247
      )
248

249
      buff = Enum.reverse(buff)
×
NEW
250
      :ok = HandlerHelpers.sock_send(data.sock, buff)
×
251
    end
252

253
    {:keep_state, %{data | anon_buffer: []}}
254
  end
255

256
  def handle_event(:internal, :check_anon_buffer, _, _) do
257
    Logger.debug("DbHandler: Anon buffer is empty")
826✔
258
    :keep_state_and_data
259
  end
260

261
  # the process received message from db without linked caller
262
  def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do
263
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when caller was nil")
×
264
    :keep_state_and_data
265
  end
266

267
  def handle_event(:info, {proto, _, bin}, _, %{replica_type: :read} = data)
268
      when proto in @proto do
269
    Logger.debug("DbHandler: Got read replica message #{inspect(bin)}")
×
NEW
270
    {:ok, pkts, rest} = Server.decode(<<data.pending_bin::binary, bin::binary>>)
×
NEW
271
    data = %{data | pending_bin: rest}
×
272

273
    resp =
×
274
      cond do
275
        Server.has_read_only_error?(pkts) ->
276
          Logger.error("DbHandler: read only error")
×
277

278
          with [_] <- pkts do
×
279
            # need to flush ready_for_query if it's not in same packet
280
            :ok = receive_ready_for_query()
×
281
          end
282

283
          :read_sql_error
284

285
        List.last(pkts).tag == :ready_for_query ->
×
286
          :ready_for_query
287

288
        true ->
×
289
          :continue
290
      end
291

292
    if resp != :continue do
×
NEW
293
      HandlerHelpers.sock_send(data.client_sock, bin)
×
NEW
294
      :ok = ClientHandler.db_status(data.caller, resp)
×
UNCOV
295
      {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
×
296
      {:keep_state, %{data | stats: stats, caller: handler_caller(data)}}
297
    else
298
      {:keep_state, data}
299
    end
300
  end
301

302
  # forward the message to the client
303
  def handle_event(:info, {proto, _, bin}, _, %{caller: caller, reply: nil} = data)
304
      when is_pid(caller) and proto in @proto do
305
    Logger.debug("DbHandler: Got write replica message  #{inspect(bin)}")
9,175✔
306
    {pkts, rest} = Supavisor.Protocol.split_pkts(<<data.pending_bin::binary, bin::binary>>)
9,175✔
307
    data = %{data | pending_bin: rest}
9,175✔
308

309
    last_packet = List.last(pkts)
9,175✔
310

311
    if transaction_complete_pkt?(last_packet) do
9,175✔
312
      HandlerHelpers.activate(data.sock)
4,251✔
313

314
      {_, stats} =
4,251✔
315
        if data.proxy,
4,251✔
316
          do: {nil, data.stats},
×
317
          else: Telem.network_usage(:db, data.sock, data.id, data.stats)
4,251✔
318

319
      # in transaction mode, we need to notify the client when the transaction is finished,
320
      # after which it will unlink the direct db connection process from itself.
321
      data =
4,251✔
322
        if data.mode == :transaction do
4,251✔
323
          # for some reason, if we send the data **before** doing the `ready_for_query` cast,
324
          # we get race condition that sometimes causes msgs to be sent to the wrong socket
325
          ClientHandler.db_status(data.caller, :ready_for_query)
3,425✔
326
          data = handle_server_messages(pkts, data)
3,425✔
327
          %{data | stats: stats, caller: nil, client_sock: nil, active_count: 0}
3,425✔
328
        else
329
          data = handle_server_messages(pkts, data)
826✔
330

331
          {_, client_stats} =
826✔
332
            if data.proxy,
826✔
333
              do: {nil, data.client_stats},
×
334
              else: Telem.network_usage(:client, data.client_sock, data.id, data.client_stats)
826✔
335

336
          %{data | stats: stats, active_count: 0, client_stats: client_stats}
826✔
337
        end
338

339
      {:next_state, :idle, data, {:next_event, :internal, :check_anon_buffer}}
4,251✔
340
    else
341
      if data.active_count > @switch_active_count,
4,924✔
342
        do: HandlerHelpers.active_once(data.sock)
2,566✔
343

344
      data = handle_server_messages(pkts, data)
4,924✔
345
      {:keep_state, %{data | active_count: data.active_count + 1}}
4,924✔
346
    end
347
  end
348

349
  def handle_event({:call, from}, {:handle_ps_pkts, pkts}, _state, data) do
350
    {iodata, data} = Enum.reduce(pkts, {[], data}, &handle_prepared_statement_pkt/2)
3,583✔
351

352
    {close_pkts, prepared_statements} = evict_exceeding(data)
3,583✔
353

354
    :ok = HandlerHelpers.sock_send(data.sock, Enum.reverse([close_pkts | iodata]))
3,583✔
355

356
    data = %{
3,583✔
357
      data
358
      | action_queue:
359
          Enum.reduce(close_pkts, data.action_queue, fn _, queue ->
3,583✔
NEW
360
            :queue.in({:intercept, :close}, queue)
×
361
          end),
362
        prepared_statements: prepared_statements
363
    }
364

365
    {:keep_state, data, {:reply, from, :ok}}
3,583✔
366
  end
367

368
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
369
    Logger.debug("DbHandler: checkout call when state was #{state}")
3,524✔
370

371
    # store the reply ref and send it when the state is idle
372
    if state in [:idle, :busy],
3,524✔
373
      do: {:keep_state, %{data | client_sock: sock, caller: caller}, {:reply, from, data.sock}},
3,414✔
374
      else: {:keep_state, %{data | client_sock: sock, caller: caller, reply: from}}
375
  end
376

377
  def handle_event({:call, from}, :ps, _, data) do
378
    Logger.debug("DbHandler: get parameter status")
×
379
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
×
380
  end
381

382
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
383
    {:stop, {:shutdown, :db_termination}, data}
×
384
  end
385

386
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
387
    Logger.error("DbHandler: Connection closed when state was #{state}")
×
388

389
    if Application.get_env(:supavisor, :reconnect_on_db_close),
×
390
      do: {:next_state, :connect, data, {:state_timeout, reconnect_timeout(data), :connect}},
×
391
      else: {:stop, {:shutdown, :db_termination}, data}
×
392
  end
393

394
  # linked client_handler went down
395
  def handle_event(_, {:EXIT, pid, reason}, _state, data) do
396
    if reason != :normal do
5✔
397
      Logger.error(
5✔
398
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
399
      )
400
    end
401

402
    HandlerHelpers.sock_send(data.sock, Server.terminate_message())
5✔
403
    HandlerHelpers.sock_close(data.sock)
5✔
404
    {:stop, {:client_handler_down, data.mode}}
5✔
405
  end
406

407
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
99✔
408
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
99✔
409
  end
410

411
  def handle_event(type, content, state, data) do
412
    msg = [
26✔
413
      {"type", type},
414
      {"content", content},
415
      {"state", state},
416
      {"data", data}
417
    ]
418

419
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
26✔
420

421
    :keep_state_and_data
422
  end
423

424
  @impl true
425
  def terminate(:shutdown, _state, data) do
426
    Telem.handler_action(:db_handler, :stopped, data.id)
24✔
427
    :ok
428
  end
429

430
  def terminate(reason, state, data) do
431
    Telem.handler_action(:db_handler, :stopped, data.id)
100✔
432

433
    if data.client_sock != nil do
100✔
434
      message =
99✔
435
        case reason do
436
          {:encode_and_forward, msg} -> Server.encode_error_message(msg)
×
437
          _ -> Server.error_message("XX000", inspect(reason))
99✔
438
        end
439

440
      HandlerHelpers.sock_send(data.client_sock, message)
99✔
441
    end
442

443
    Logger.error(
100✔
444
      "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
445
    )
446
  end
447

448
  @spec try_ssl_handshake(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.sock()} | {:error, term()}
449
  defp try_ssl_handshake(sock, %{upstream_ssl: true} = auth) do
NEW
450
    case HandlerHelpers.sock_send(sock, Server.ssl_request()) do
×
451
      :ok -> ssl_recv(sock, auth)
×
452
      error -> error
×
453
    end
454
  end
455

456
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
139✔
457

458
  @spec ssl_recv(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.ssl_sock()} | {:error, term}
459
  defp ssl_recv({:gen_tcp, sock} = s, auth) do
460
    case :gen_tcp.recv(sock, 1, 15_000) do
×
461
      {:ok, <<?S>>} -> ssl_connect(s, auth)
×
462
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
×
463
      {:error, _} = error -> error
×
464
    end
465
  end
466

467
  @spec ssl_connect(Supavisor.tcp_sock(), map, pos_integer) ::
468
          {:ok, Supavisor.ssl_sock()} | {:error, term}
469
  defp ssl_connect({:gen_tcp, sock}, auth, timeout \\ 5000) do
470
    opts =
×
471
      case auth.upstream_verify do
×
472
        :peer ->
×
473
          [
474
            verify: :verify_peer,
475
            cacerts: [auth.upstream_tls_ca],
×
476
            # unclear behavior on pg14
477
            server_name_indication: auth.sni_hostname || auth.host,
×
478
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
479
          ]
480

481
        :none ->
×
482
          [verify: :verify_none]
483
      end
484

485
    case :ssl.connect(sock, opts, timeout) do
×
486
      {:ok, ssl_sock} ->
×
487
        {:ok, {:ssl, ssl_sock}}
488

489
      {:error, reason} ->
×
490
        {:error, reason}
491
    end
492
  end
493

494
  @spec send_startup(Supavisor.sock(), map(), String.t() | nil, String.t() | nil) ::
495
          :ok | {:error, term}
496
  def send_startup(sock, auth, tenant, search_path) do
497
    user =
139✔
498
      if is_nil(tenant), do: get_user(auth), else: "#{get_user(auth)}.#{tenant}"
139✔
499

500
    msg =
138✔
501
      :pgo_protocol.encode_startup_message(
502
        [
503
          {"user", user},
504
          {"database", auth.database},
138✔
505
          {"application_name", auth.application_name}
138✔
506
        ] ++ if(search_path, do: [{"options", "--search_path=#{search_path}"}], else: [])
138✔
507
      )
508

509
    HandlerHelpers.sock_send(sock, msg)
138✔
510
  end
511

512
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
513
  defp activate({:gen_tcp, sock}) do
514
    :inet.setopts(sock, active: true)
138✔
515
  end
516

517
  defp activate({:ssl, sock}) do
518
    :ssl.setopts(sock, active: true)
×
519
  end
520

521
  defp get_user(auth) do
522
    if auth.require_user do
276✔
523
      auth.secrets.().db_user
40✔
524
    else
525
      auth.secrets.().user
236✔
526
    end
527
  end
528

529
  @spec receive_ready_for_query() :: :ok | :timeout_error
530
  defp receive_ready_for_query do
531
    receive do
×
532
      {_proto, _socket, <<?Z, 5::32, ?I>>} ->
×
533
        :ok
534
    after
535
      15_000 -> :timeout_error
×
536
    end
537
  end
538

539
  @spec handler_caller(map()) :: pid() | nil
540
  defp handler_caller(%{mode: :session} = data), do: data.caller
×
541
  defp handler_caller(_), do: nil
×
542

543
  @spec check_ready(binary()) ::
544
          {:ready_for_query, :idle | :transaction_block | :failed_transaction_block} | :continue
545
  def check_ready(bin) do
546
    bin_size = byte_size(bin)
9✔
547

548
    case bin do
9✔
549
      <<_::binary-size(bin_size - 6), 90, 0, 0, 0, 5, status_indicator::binary>> ->
550
        indicator =
6✔
551
          case status_indicator do
552
            <<?I>> -> :idle
2✔
553
            <<?T>> -> :transaction_block
2✔
554
            <<?E>> -> :failed_transaction_block
2✔
555
            _ -> :continue
×
556
          end
557

558
        {:ready_for_query, indicator}
559

560
      _ ->
3✔
561
        :continue
562
    end
563
  end
564

565
  @spec handle_auth_pkts(map(), map(), map()) :: any()
566
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
567
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
1,768✔
568

569
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
136✔
570
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
571

572
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
573
    key = self()
136✔
574
    conn = %{host: data.auth.host, port: data.auth.port, ip_ver: data.auth.ip_version}
136✔
575
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
136✔
576
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
136✔
577
    Map.put(acc, :backend_key_data, payload)
136✔
578
  end
579

580
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
581
    nonce =
137✔
582
      case Server.decode_string(methods_b) do
583
        {:ok, req_method, _} ->
584
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
137✔
585
          nonce = :pgo_scram.get_nonce(16)
137✔
586
          user = get_user(data.auth)
137✔
587
          client_first = :pgo_scram.get_client_first(user, nonce)
137✔
588
          client_first_size = IO.iodata_length(client_first)
137✔
589

590
          sasl_initial_response = [
137✔
591
            "SCRAM-SHA-256",
592
            0,
593
            <<client_first_size::32-integer>>,
594
            client_first
595
          ]
596

597
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
137✔
598
          :ok = HandlerHelpers.sock_send(data.sock, bin)
137✔
599
          nonce
137✔
600

601
        other ->
602
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
×
603
          nil
604
      end
605

606
    {:authentication_sasl, nonce}
607
  end
608

609
  defp handle_auth_pkts(
610
         %{payload: {:authentication_server_first_message, server_first}},
611
         _,
612
         data
613
       )
614
       when data.auth.require_user == false do
615
    nonce = data.nonce
118✔
616
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
118✔
617

618
    {client_final_message, server_proof} =
118✔
619
      Helpers.get_client_final(
620
        :auth_query,
621
        data.auth.secrets.(),
118✔
622
        server_first_parts,
623
        nonce,
624
        data.auth.secrets.().user,
118✔
625
        "biws"
626
      )
627

628
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
118✔
629
    :ok = HandlerHelpers.sock_send(data.sock, bin)
118✔
630

631
    {:authentication_server_first_message, server_proof}
632
  end
633

634
  defp handle_auth_pkts(
635
         %{payload: {:authentication_server_first_message, server_first}},
636
         _,
637
         data
638
       ) do
639
    nonce = data.nonce
19✔
640
    server_first_parts = :pgo_scram.parse_server_first(server_first, nonce)
19✔
641

642
    {client_final_message, server_proof} =
19✔
643
      :pgo_scram.get_client_final(
644
        server_first_parts,
645
        nonce,
646
        data.auth.user,
19✔
647
        data.auth.secrets.().password
19✔
648
      )
649

650
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
19✔
651
    :ok = HandlerHelpers.sock_send(data.sock, bin)
19✔
652

653
    {:authentication_server_first_message, server_proof}
654
  end
655

656
  defp handle_auth_pkts(
657
         %{payload: {:authentication_server_final_message, server_final}},
658
         acc,
659
         _data
660
       ),
661
       do: Map.put(acc, :authentication_server_final_message, server_final)
136✔
662

663
  defp handle_auth_pkts(
664
         %{payload: :authentication_ok},
665
         acc,
666
         _data
667
       ),
668
       do: Map.put(acc, :authentication_ok, true)
136✔
669

670
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
671
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
672

673
    digest =
1✔
674
      if data.auth.method == :password do
1✔
675
        Helpers.md5([data.auth.password.(), data.auth.user])
1✔
676
      else
677
        data.auth.secrets.().secret
×
678
      end
679

680
    payload = ["md5", Helpers.md5([digest, salt]), 0]
1✔
681
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
682
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
683
    :authentication_md5
684
  end
685

686
  defp handle_auth_pkts(%{payload: :authentication_cleartext_password} = dec_pkt, _, data) do
687
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
688

689
    payload = <<data.auth.password.()::binary, 0>>
1✔
690
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
691
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
692
    :authentication_cleartext
693
  end
694

695
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
×
696
    do: {:error_response, error}
697

698
  defp handle_auth_pkts(_e, acc, _data), do: acc
×
699

700
  @spec handle_authentication_error(map(), String.t()) :: any()
701
  defp handle_authentication_error(%{proxy: false} = data, reason) do
702
    tenant = Supavisor.tenant(data.id)
×
703

704
    :erpc.multicast([node() | Node.list()], fn ->
×
705
      Cachex.del(Supavisor.Cache, {:secrets, tenant, data.user})
×
706
      Cachex.del(Supavisor.Cache, {:secrets_check, tenant, data.user})
×
707

708
      Registry.dispatch(Supavisor.Registry.TenantClients, data.id, fn entries ->
×
709
        for {client_handler, _meta} <- entries,
×
710
            do: send(client_handler, {:disconnect, reason})
711
      end)
712
    end)
713

714
    Supavisor.stop(data.id)
×
715
  end
716

717
  defp handle_authentication_error(%{proxy: true}, _reason), do: :ok
×
718

719
  @spec reconnect_timeout(map()) :: pos_integer()
720
  def reconnect_timeout(%{proxy: true}),
×
721
    do: @reconnect_timeout_proxy
722

723
  def reconnect_timeout(_),
1✔
724
    do: @reconnect_timeout
725

726
  @spec handle_server_messages([binary()], map()) :: map()
727
  defp handle_server_messages(pkts, data) do
728
    {packets_to_send, updated_data} = process_messages(pkts, [], data)
9,175✔
729

730
    if packets_to_send != [] do
9,175✔
731
      HandlerHelpers.sock_send(data.client_sock, Enum.reverse(packets_to_send))
8,150✔
732
    end
733

734
    updated_data
9,175✔
735
  end
736

737
  defp process_messages([msg | rest], acc_bins, data) do
738
    {acc_bins, data} = maybe_inject(data, acc_bins)
259,790✔
739

740
    case msg do
259,790✔
741
      Server.parse_complete_message() ->
742
        {acc_bins, data} = maybe_filter_message(msg, acc_bins, data, :parse)
1,064✔
743
        process_messages(rest, acc_bins, data)
1,064✔
744

745
      Server.close_complete_message() ->
746
        {acc_bins, data} = maybe_filter_message(msg, acc_bins, data, :close)
1,333✔
747
        process_messages(rest, acc_bins, data)
1,333✔
748

749
      _other ->
750
        process_messages(rest, [msg | acc_bins], data)
257,393✔
751
    end
752
  end
753

754
  defp process_messages([], acc_bins, data) do
755
    maybe_inject(data, acc_bins)
9,175✔
756
  end
757

758
  defp maybe_filter_message(msg, acc_bins, data, action_type) do
759
    case :queue.out(data.action_queue) do
2,397✔
760
      {{:value, {:forward, ^action_type}}, updated_queue} ->
1,351✔
761
        {[msg | acc_bins], %{data | action_queue: updated_queue}}
762

763
      {{:value, {:intercept, ^action_type}}, updated_queue} ->
3✔
764
        {acc_bins, %{data | action_queue: updated_queue}}
765

766
      _other ->
1,043✔
767
        {[msg | acc_bins], data}
768
    end
769
  end
770

771
  defp maybe_inject(data, acc_bins) do
772
    case :queue.out(data.action_queue) do
268,965✔
773
      {{:value, {:inject, :parse}}, q2} ->
1,305✔
774
        {[Server.parse_complete_message() | acc_bins], %{data | action_queue: q2}}
775

776
      _other ->
267,660✔
777
        {acc_bins, data}
778
    end
779
  end
780

781
  defp transaction_complete_pkt?(<<?Z, 5::32, ?I>>), do: true
4,251✔
782
  defp transaction_complete_pkt?(_), do: false
4,924✔
783

784
  # If the prepared statement exists for us, it exists for the server, so we just send the
785
  # bind to the socket. If it doesn't, we must send the parse pkt first.
786
  #
787
  # If we received a bind without a parse, we need to intercept the parse response, otherwise,
788
  # the client will receive an unexpected message.
789
  defp handle_prepared_statement_pkt({:bind_pkt, stmt_name, pkt, parse_pkt}, {iodata, data}) do
790
    if stmt_name in data.prepared_statements do
1,528✔
791
      {[pkt | iodata], data}
792
    else
793
      new_data = %{
3✔
794
        data
795
        | action_queue: :queue.in({:intercept, :parse}, data.action_queue),
3✔
796
          prepared_statements: MapSet.put(data.prepared_statements, stmt_name)
3✔
797
      }
798

799
      {[[parse_pkt, pkt] | iodata], new_data}
800
    end
801
  end
802

803
  defp handle_prepared_statement_pkt({:close_pkt, stmt_name, pkt}, {iodata, data}) do
1,332✔
804
    {[pkt | iodata],
805
     %{
806
       data
807
       | prepared_statements: MapSet.delete(data.prepared_statements, stmt_name),
1,332✔
808
         action_queue: :queue.in({:forward, :close}, data.action_queue)
1,332✔
809
     }}
810
  end
811

812
  defp handle_prepared_statement_pkt({:describe_pkt, _stmt_name, pkt}, {iodata, data}) do
2,030✔
813
    {[pkt | iodata], data}
814
  end
815

816
  # If we stop generating unique id per statement, and instead do deterministic ids,
817
  # we need to potentially drop parse pkts and return a parse response
818
  defp handle_prepared_statement_pkt({:parse_pkt, stmt_name, pkt}, {iodata, data}) do
819
    if stmt_name in data.prepared_statements do
1,325✔
820
      {iodata, %{data | action_queue: :queue.in({:inject, :parse}, data.action_queue)}}
1,305✔
821
    else
822
      prepared_statements = MapSet.put(data.prepared_statements, stmt_name)
20✔
823

824
      {[pkt | iodata],
825
       %{
826
         data
827
         | prepared_statements: prepared_statements,
828
           action_queue: :queue.in({:forward, :parse}, data.action_queue)
20✔
829
       }}
830
    end
831
  end
832

833
  defp evict_exceeding(%{prepared_statements: prepared_statements, id: id}) do
834
    limit = PreparedStatements.backend_limit()
3,583✔
835

836
    if MapSet.size(prepared_statements) >= limit do
3,583✔
NEW
837
      count = div(limit, 5)
×
NEW
838
      to_remove = Enum.take_random(prepared_statements, count) |> MapSet.new()
×
NEW
839
      close_pkts = Enum.map(to_remove, &PreparedStatements.build_close_pkt/1)
×
NEW
840
      prepared_statements = MapSet.difference(prepared_statements, to_remove)
×
NEW
841
      Telem.prepared_statements_evicted(count, id)
×
842

843
      {close_pkts, prepared_statements}
844
    else
845
      {[], prepared_statements}
846
    end
847
  end
848

849
  defp maybe_reconnect(reason, data) do
850
    max_reconnect_retries = Application.get_env(:supavisor, :reconnect_retries)
1✔
851

852
    if data.reconnect_retries > max_reconnect_retries and data.client_sock != nil do
1✔
853
      {:stop, {:failed_to_connect, reason}}
854
    else
855
      {:keep_state_and_data, {:state_timeout, reconnect_timeout(data), :connect}}
856
    end
857
  end
858
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc