• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 16393134685

19 Jul 2025 10:00PM UTC coverage: 55.477% (-0.05%) from 55.524%
16393134685

Pull #705

github

web-flow
Merge 19ae5c712 into bdb6ca05b
Pull Request #705: feat: runtime-configurable connection retries. Support -1 for infinity

3 of 5 new or added lines in 1 file covered. (60.0%)

1 existing line in 1 file now uncovered.

1170 of 2109 relevant lines covered (55.48%)

131.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.8
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a link with the database, send requests to the database, and handle incoming messages from clients.
4
  It uses the Supavisor.Protocol.Server module to decode messages from the database and sends messages to clients Supavisor.ClientHandler.
5
  """
6

7
  require Logger
8

9
  @behaviour :gen_statem
10

11
  alias Supavisor.{
12
    ClientHandler,
13
    HandlerHelpers,
14
    Helpers,
15
    Monitoring.Telem,
16
    Protocol.Server
17
  }
18

19
  @type state :: :connect | :authentication | :idle | :busy
20

21
  @reconnect_timeout 2_500
22
  @reconnect_timeout_proxy 500
23
  @sock_closed [:tcp_closed, :ssl_closed]
24
  @proto [:tcp, :ssl]
25
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
26

27
  def start_link(config),
28
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
102✔
29

30
  def checkout(pid, sock, caller, timeout \\ 15_000),
642✔
31
    do: :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
642✔
32

33
  @spec checkin(pid()) :: :ok
34
  def checkin(pid), do: :gen_statem.cast(pid, :checkin)
×
35

36
  @spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
37
  def get_state_and_mode(pid) do
79✔
38
    {:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
39
  catch
40
    error, reason -> {:error, {error, reason}}
×
41
  end
42

43
  @spec stop(pid()) :: :ok
44
  def stop(pid) do
45
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
77✔
46
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
77✔
47
  end
48

49
  @impl true
50
  def init(args) do
51
    Process.flag(:trap_exit, true)
103✔
52
    Helpers.set_log_level(args.log_level)
103✔
53
    Helpers.set_max_heap_size(90)
103✔
54

55
    {_, tenant} = args.tenant
103✔
56
    Logger.metadata(project: tenant, user: args.user, mode: args.mode)
103✔
57

58
    data =
103✔
59
      %{
60
        id: args.id,
103✔
61
        sock: nil,
62
        sent: false,
63
        auth: args.auth,
103✔
64
        user: args.user,
103✔
65
        tenant: args.tenant,
103✔
66
        buffer: [],
67
        anon_buffer: [],
68
        db_state: nil,
69
        parameter_status: %{},
70
        nonce: nil,
71
        messages: "",
72
        server_proof: nil,
73
        stats: %{},
74
        client_stats: %{},
75
        mode: args.mode,
103✔
76
        replica_type: args.replica_type,
103✔
77
        reply: nil,
78
        caller: args[:caller] || nil,
103✔
79
        client_sock: args[:client_sock] || nil,
103✔
80
        proxy: args[:proxy] || false,
103✔
81
        active_count: 0,
82
        reconnect_retries: 0
83
      }
84

85
    Telem.handler_action(:db_handler, :started, args.id)
103✔
86
    {:ok, :connect, data, {:next_event, :internal, :connect}}
103✔
87
  end
88

89
  @impl true
90
  def callback_mode, do: [:handle_event_function]
102✔
91

92
  @impl true
93
  def handle_event(:internal, _, :connect, %{auth: auth} = data) do
94
    Logger.debug("DbHandler: Try to connect to DB")
104✔
95

96
    sock_opts =
104✔
97
      [
98
        auth.ip_version,
104✔
99
        mode: :binary,
100
        packet: :raw,
101
        # recbuf: 8192,
102
        # sndbuf: 8192,
103
        # backlog: 2048,
104
        # send_timeout: 120,
105
        # keepalive: true,
106
        # nopush: true,
107
        nodelay: true,
108
        active: false
109
      ]
110

111
    Telem.handler_action(:db_handler, :db_connection, data.id)
104✔
112

113
    case :gen_tcp.connect(auth.host, auth.port, sock_opts) do
104✔
114
      {:ok, sock} ->
115
        Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}")
103✔
116

117
        case try_ssl_handshake({:gen_tcp, sock}, auth) do
103✔
118
          {:ok, sock} ->
119
            tenant = if data.proxy, do: Supavisor.tenant(data.id)
103✔
120
            search_path = Supavisor.search_path(data.id)
103✔
121

122
            case send_startup(sock, auth, tenant, search_path) do
103✔
123
              :ok ->
124
                :ok = activate(sock)
102✔
125
                {:next_state, :authentication, %{data | sock: sock}}
102✔
126

127
              {:error, reason} ->
128
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
×
NEW
129
                maybe_reconnect(reason, data)
×
130
            end
131

132
          {:error, reason} ->
133
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
×
NEW
134
            maybe_reconnect(reason, data)
×
135
        end
136

137
      other ->
138
        Logger.error(
1✔
139
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}"
1✔
140
        )
141

142
        maybe_reconnect(other, data)
1✔
143
    end
144
  end
145

146
  def handle_event(:state_timeout, :connect, _state, data) do
147
    retry = data.reconnect_retries
×
148
    Logger.warning("DbHandler: Reconnect #{retry} to DB")
×
149

150
    {:keep_state, %{data | reconnect_retries: retry + 1}, {:next_event, :internal, :connect}}
×
151
  end
152

153
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
154
    dec_pkt = Server.decode(bin)
303✔
155
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
303✔
156

157
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
303✔
158

159
    case resp do
303✔
160
      {:authentication_sasl, nonce} ->
101✔
161
        {:keep_state, %{data | nonce: nonce}}
162

163
      {:authentication_server_first_message, server_proof} ->
101✔
164
        {:keep_state, %{data | server_proof: server_proof}}
165

166
      %{authentication_server_final_message: _server_final} ->
×
167
        :keep_state_and_data
168

169
      %{authentication_ok: true} ->
×
170
        :keep_state_and_data
171

172
      :authentication ->
×
173
        :keep_state_and_data
174

175
      :authentication_md5 ->
1✔
176
        {:keep_state, data}
177

178
      {:error_response, ["SFATAL", "VFATAL", "C28P01", reason, _, _, _]} ->
179
        handle_authentication_error(data, reason)
×
180
        Logger.error("DbHandler: Auth error #{inspect(reason)}")
×
181
        {:stop, :invalid_password, data}
×
182

183
      {:error_response, error} ->
184
        Logger.error("DbHandler: Error auth response #{inspect(error)}")
×
185
        {:stop, {:encode_and_forward, error}}
186

187
      {:ready_for_query, acc} ->
188
        ps = acc.ps
100✔
189

190
        Logger.debug(
100✔
191
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
100✔
192
        )
193

194
        if data.proxy do
100✔
195
          bin_ps = Server.encode_parameter_status(ps)
×
196
          send(data.caller, {:parameter_status, bin_ps})
×
197
        else
198
          Supavisor.set_parameter_status(data.id, ps)
100✔
199
        end
200

201
        {:next_state, :idle, %{data | parameter_status: ps, reconnect_retries: 0},
100✔
202
         {:next_event, :internal, :check_buffer}}
203

204
      other ->
205
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
×
206
        {:stop, :auth_error, data}
×
207
    end
208
  end
209

210
  def handle_event(:internal, :check_buffer, :idle, %{reply: from} = data) when from != nil do
211
    Logger.debug("DbHandler: Check buffer")
81✔
212
    {:next_state, :busy, %{data | reply: nil}, {:reply, from, data.sock}}
81✔
213
  end
214

215
  def handle_event(:internal, :check_buffer, :idle, %{buffer: buff, caller: caller} = data)
216
      when is_pid(caller) do
217
    if buff != [] do
×
218
      Logger.debug("DbHandler: Buffer is not empty, try to send #{IO.iodata_length(buff)} bytes")
×
219
      buff = Enum.reverse(buff)
×
220
      :ok = sock_send(data.sock, buff)
×
221
    end
222

223
    {:next_state, :busy, %{data | buffer: []}}
×
224
  end
225

226
  # check if it needs to apply queries from the anon buffer
227
  def handle_event(:internal, :check_anon_buffer, _, %{anon_buffer: buff, caller: nil} = data) do
228
    Logger.debug("DbHandler: Check anon buffer")
563✔
229

230
    if buff != [] do
563✔
231
      Logger.debug(
×
232
        "DbHandler: Anon buffer is not empty, try to send #{IO.iodata_length(buff)} bytes"
×
233
      )
234

235
      buff = Enum.reverse(buff)
×
236
      :ok = sock_send(data.sock, buff)
×
237
    end
238

239
    {:keep_state, %{data | anon_buffer: []}}
240
  end
241

242
  def handle_event(:internal, :check_anon_buffer, _, _) do
243
    Logger.debug("DbHandler: Anon buffer is empty")
791✔
244
    :keep_state_and_data
245
  end
246

247
  # the process received message from db without linked caller
248
  def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do
249
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when caller was nil")
×
250
    :keep_state_and_data
251
  end
252

253
  def handle_event(:info, {proto, _, bin}, _, %{replica_type: :read} = data)
254
      when proto in @proto do
255
    Logger.debug("DbHandler: Got read replica message #{inspect(bin)}")
×
256
    pkts = Server.decode(bin)
×
257

258
    resp =
×
259
      cond do
260
        Server.has_read_only_error?(pkts) ->
261
          Logger.error("DbHandler: read only error")
×
262

263
          with [_] <- pkts do
×
264
            # need to flush ready_for_query if it's not in same packet
265
            :ok = receive_ready_for_query()
×
266
          end
267

268
          :read_sql_error
269

270
        List.last(pkts).tag == :ready_for_query ->
×
271
          :ready_for_query
272

273
        true ->
×
274
          :continue
275
      end
276

277
    if resp != :continue do
×
278
      :ok = ClientHandler.db_status(data.caller, resp, bin)
×
279
      {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
×
280
      {:keep_state, %{data | stats: stats, caller: handler_caller(data)}}
281
    else
282
      :keep_state_and_data
283
    end
284
  end
285

286
  # forward the message to the client
287
  def handle_event(:info, {proto, _, bin}, _, %{caller: caller, reply: nil} = data)
288
      when is_pid(caller) and proto in @proto do
289
    Logger.debug("DbHandler: Got write replica message  #{inspect(bin)}")
6,239✔
290

291
    if String.ends_with?(bin, Server.ready_for_query()) do
6,239✔
292
      HandlerHelpers.activate(data.sock)
1,354✔
293

294
      {_, stats} =
1,354✔
295
        if data.proxy,
1,354✔
296
          do: {nil, data.stats},
×
297
          else: Telem.network_usage(:db, data.sock, data.id, data.stats)
1,354✔
298

299
      # in transaction mode, we need to notify the client when the transaction is finished,
300
      # after which it will unlink the direct db connection process from itself.
301
      data =
1,354✔
302
        if data.mode == :transaction do
1,354✔
303
          ClientHandler.db_status(data.caller, :ready_for_query, bin)
563✔
304
          %{data | stats: stats, caller: nil, client_sock: nil, active_count: 0}
563✔
305
        else
306
          HandlerHelpers.sock_send(data.client_sock, bin)
791✔
307

308
          {_, client_stats} =
791✔
309
            if data.proxy,
791✔
310
              do: {nil, data.client_stats},
×
311
              else: Telem.network_usage(:client, data.client_sock, data.id, data.client_stats)
791✔
312

313
          %{data | stats: stats, active_count: 0, client_stats: client_stats}
791✔
314
        end
315

316
      {:next_state, :idle, data, {:next_event, :internal, :check_anon_buffer}}
1,354✔
317
    else
318
      if data.active_count > @switch_active_count,
4,885✔
319
        do: HandlerHelpers.active_once(data.sock)
2,582✔
320

321
      HandlerHelpers.sock_send(data.client_sock, bin)
4,885✔
322
      {:keep_state, %{data | active_count: data.active_count + 1}}
4,885✔
323
    end
324
  end
325

326
  def handle_event(:info, {:handle_ps, payload, bin}, _state, data) do
327
    Logger.notice("DbHandler: Apply prepare statement change #{inspect(payload)}")
×
328

329
    {:keep_state, %{data | anon_buffer: [bin | data.anon_buffer]},
×
330
     {:next_event, :internal, :check_anon_buffer}}
331
  end
332

333
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
334
    Logger.debug("DbHandler: checkout call when state was #{state}")
642✔
335

336
    # store the reply ref and send it when the state is idle
337
    if state in [:idle, :busy],
642✔
338
      do: {:keep_state, %{data | client_sock: sock, caller: caller}, {:reply, from, data.sock}},
561✔
339
      else: {:keep_state, %{data | client_sock: sock, caller: caller, reply: from}}
340
  end
341

342
  def handle_event({:call, from}, :ps, _, data) do
343
    Logger.debug("DbHandler: get parameter status")
×
344
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
×
345
  end
346

347
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
348
    {:stop, {:shutdown, :db_termination}, data}
×
349
  end
350

351
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
352
    Logger.error("DbHandler: Connection closed when state was #{state}")
×
353

354
    if Application.get_env(:supavisor, :reconnect_on_db_close),
×
355
      do: {:next_state, :connect, data, {:state_timeout, reconnect_timeout(data), :connect}},
×
356
      else: {:stop, {:shutdown, :db_termination}, data}
×
357
  end
358

359
  # linked client_handler went down
360
  def handle_event(_, {:EXIT, pid, reason}, state, data) do
361
    if reason != :normal do
2✔
362
      Logger.error(
2✔
363
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
364
      )
365
    end
366

367
    if state == :busy or data.mode == :session do
2✔
368
      sock_send(data.sock, Server.terminate_message())
×
369
      :gen_tcp.close(elem(data.sock, 1))
×
370
      {:stop, {:client_handler_down, data.mode}}
×
371
    else
372
      {:keep_state, %{data | caller: nil, buffer: []}}
373
    end
374
  end
375

376
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
79✔
377
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
79✔
378
  end
379

380
  def handle_event(type, content, state, data) do
381
    msg = [
19✔
382
      {"type", type},
383
      {"content", content},
384
      {"state", state},
385
      {"data", data}
386
    ]
387

388
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
19✔
389

390
    :keep_state_and_data
391
  end
392

393
  @impl true
394
  def terminate(:shutdown, _state, data) do
395
    Telem.handler_action(:db_handler, :stopped, data.id)
18✔
396
    :ok
397
  end
398

399
  def terminate(reason, state, data) do
400
    Telem.handler_action(:db_handler, :stopped, data.id)
78✔
401

402
    if data.client_sock != nil do
78✔
403
      message =
77✔
404
        case reason do
405
          {:encode_and_forward, msg} -> Server.encode_error_message(msg)
×
406
          _ -> Server.error_message("XX000", inspect(reason))
77✔
407
        end
408

409
      HandlerHelpers.sock_send(data.client_sock, message)
77✔
410
    end
411

412
    Logger.error(
78✔
413
      "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
414
    )
415
  end
416

417
  @spec try_ssl_handshake(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.sock()} | {:error, term()}
418
  defp try_ssl_handshake(sock, %{upstream_ssl: true} = auth) do
419
    case sock_send(sock, Server.ssl_request()) do
×
420
      :ok -> ssl_recv(sock, auth)
×
421
      error -> error
×
422
    end
423
  end
424

425
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
103✔
426

427
  @spec ssl_recv(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.ssl_sock()} | {:error, term}
428
  defp ssl_recv({:gen_tcp, sock} = s, auth) do
429
    case :gen_tcp.recv(sock, 1, 15_000) do
×
430
      {:ok, <<?S>>} -> ssl_connect(s, auth)
×
431
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
×
432
      {:error, _} = error -> error
×
433
    end
434
  end
435

436
  @spec ssl_connect(Supavisor.tcp_sock(), map, pos_integer) ::
437
          {:ok, Supavisor.ssl_sock()} | {:error, term}
438
  defp ssl_connect({:gen_tcp, sock}, auth, timeout \\ 5000) do
439
    opts =
×
440
      case auth.upstream_verify do
×
441
        :peer ->
×
442
          [
443
            verify: :verify_peer,
444
            cacerts: [auth.upstream_tls_ca],
×
445
            # unclear behavior on pg14
446
            server_name_indication: auth.sni_hostname || auth.host,
×
447
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
448
          ]
449

450
        :none ->
×
451
          [verify: :verify_none]
452
      end
453

454
    case :ssl.connect(sock, opts, timeout) do
×
455
      {:ok, ssl_sock} ->
×
456
        {:ok, {:ssl, ssl_sock}}
457

458
      {:error, reason} ->
×
459
        {:error, reason}
460
    end
461
  end
462

463
  @spec send_startup(Supavisor.sock(), map(), String.t() | nil, String.t() | nil) ::
464
          :ok | {:error, term}
465
  def send_startup(sock, auth, tenant, search_path) do
466
    user =
103✔
467
      if is_nil(tenant), do: get_user(auth), else: "#{get_user(auth)}.#{tenant}"
103✔
468

469
    msg =
102✔
470
      :pgo_protocol.encode_startup_message(
471
        [
472
          {"user", user},
473
          {"database", auth.database},
102✔
474
          {"application_name", auth.application_name}
102✔
475
        ] ++ if(search_path, do: [{"options", "--search_path=#{search_path}"}], else: [])
102✔
476
      )
477

478
    sock_send(sock, msg)
102✔
479
  end
480

481
  @spec sock_send(Supavisor.sock(), iodata) :: :ok | {:error, term}
482
  defp sock_send({mod, sock}, data) do
483
    mod.send(sock, data)
102✔
484
  end
485

486
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
487
  defp activate({:gen_tcp, sock}) do
488
    :inet.setopts(sock, active: true)
102✔
489
  end
490

491
  defp activate({:ssl, sock}) do
492
    :ssl.setopts(sock, active: true)
×
493
  end
494

495
  defp get_user(auth) do
496
    if auth.require_user do
204✔
497
      auth.secrets.().db_user
18✔
498
    else
499
      auth.secrets.().user
186✔
500
    end
501
  end
502

503
  @spec receive_ready_for_query() :: :ok | :timeout_error
504
  defp receive_ready_for_query do
505
    receive do
×
506
      {_proto, _socket, <<?Z, 5::32, ?I>>} ->
×
507
        :ok
508
    after
509
      15_000 -> :timeout_error
×
510
    end
511
  end
512

513
  @spec handler_caller(map()) :: pid() | nil
514
  defp handler_caller(%{mode: :session} = data), do: data.caller
×
515
  defp handler_caller(_), do: nil
×
516

517
  @spec check_ready(binary()) ::
518
          {:ready_for_query, :idle | :transaction_block | :failed_transaction_block} | :continue
519
  def check_ready(bin) do
520
    bin_size = byte_size(bin)
9✔
521

522
    case bin do
9✔
523
      <<_::binary-size(bin_size - 6), 90, 0, 0, 0, 5, status_indicator::binary>> ->
524
        indicator =
6✔
525
          case status_indicator do
526
            <<?I>> -> :idle
2✔
527
            <<?T>> -> :transaction_block
2✔
528
            <<?E>> -> :failed_transaction_block
2✔
529
            _ -> :continue
×
530
          end
531

532
        {:ready_for_query, indicator}
533

534
      _ ->
3✔
535
        :continue
536
    end
537
  end
538

539
  @spec handle_auth_pkts(map(), map(), map()) :: any()
540
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
541
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
1,300✔
542

543
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
100✔
544
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
545

546
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
547
    key = self()
100✔
548
    conn = %{host: data.auth.host, port: data.auth.port, ip_ver: data.auth.ip_version}
100✔
549
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
100✔
550
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
100✔
551
    Map.put(acc, :backend_key_data, payload)
100✔
552
  end
553

554
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
555
    nonce =
101✔
556
      case Server.decode_string(methods_b) do
557
        {:ok, req_method, _} ->
558
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
101✔
559
          nonce = :pgo_scram.get_nonce(16)
101✔
560
          user = get_user(data.auth)
101✔
561
          client_first = :pgo_scram.get_client_first(user, nonce)
101✔
562
          client_first_size = IO.iodata_length(client_first)
101✔
563

564
          sasl_initial_response = [
101✔
565
            "SCRAM-SHA-256",
566
            0,
567
            <<client_first_size::32-integer>>,
568
            client_first
569
          ]
570

571
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
101✔
572
          :ok = HandlerHelpers.sock_send(data.sock, bin)
101✔
573
          nonce
101✔
574

575
        other ->
576
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
×
577
          nil
578
      end
579

580
    {:authentication_sasl, nonce}
581
  end
582

583
  defp handle_auth_pkts(
584
         %{payload: {:authentication_server_first_message, server_first}},
585
         _,
586
         data
587
       )
588
       when data.auth.require_user == false do
589
    nonce = data.nonce
93✔
590
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
93✔
591

592
    {client_final_message, server_proof} =
93✔
593
      Helpers.get_client_final(
594
        :auth_query,
595
        data.auth.secrets.(),
93✔
596
        server_first_parts,
597
        nonce,
598
        data.auth.secrets.().user,
93✔
599
        "biws"
600
      )
601

602
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
93✔
603
    :ok = HandlerHelpers.sock_send(data.sock, bin)
93✔
604

605
    {:authentication_server_first_message, server_proof}
606
  end
607

608
  defp handle_auth_pkts(
609
         %{payload: {:authentication_server_first_message, server_first}},
610
         _,
611
         data
612
       ) do
613
    nonce = data.nonce
8✔
614
    server_first_parts = :pgo_scram.parse_server_first(server_first, nonce)
8✔
615

616
    {client_final_message, server_proof} =
8✔
617
      :pgo_scram.get_client_final(
618
        server_first_parts,
619
        nonce,
620
        data.auth.user,
8✔
621
        data.auth.secrets.().password
8✔
622
      )
623

624
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
8✔
625
    :ok = HandlerHelpers.sock_send(data.sock, bin)
8✔
626

627
    {:authentication_server_first_message, server_proof}
628
  end
629

630
  defp handle_auth_pkts(
631
         %{payload: {:authentication_server_final_message, server_final}},
632
         acc,
633
         _data
634
       ),
635
       do: Map.put(acc, :authentication_server_final_message, server_final)
100✔
636

637
  defp handle_auth_pkts(
638
         %{payload: :authentication_ok},
639
         acc,
640
         _data
641
       ),
642
       do: Map.put(acc, :authentication_ok, true)
100✔
643

644
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
645
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
646

647
    digest =
1✔
648
      if data.auth.method == :password do
1✔
649
        Helpers.md5([data.auth.password.(), data.auth.user])
1✔
650
      else
651
        data.auth.secrets.().secret
×
652
      end
653

654
    payload = ["md5", Helpers.md5([digest, salt]), 0]
1✔
655
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
656
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
657
    :authentication_md5
658
  end
659

660
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
×
661
    do: {:error_response, error}
662

663
  defp handle_auth_pkts(_e, acc, _data), do: acc
×
664

665
  @spec handle_authentication_error(map(), String.t()) :: any()
666
  defp handle_authentication_error(%{proxy: false} = data, reason) do
667
    tenant = Supavisor.tenant(data.id)
×
668

669
    :erpc.multicast([node() | Node.list()], fn ->
×
670
      Cachex.del(Supavisor.Cache, {:secrets, tenant, data.user})
×
671
      Cachex.del(Supavisor.Cache, {:secrets_check, tenant, data.user})
×
672

673
      Registry.dispatch(Supavisor.Registry.TenantClients, data.id, fn entries ->
×
674
        for {client_handler, _meta} <- entries,
×
675
            do: send(client_handler, {:disconnect, reason})
676
      end)
677
    end)
678

679
    Supavisor.stop(data.id)
×
680
  end
681

682
  defp handle_authentication_error(%{proxy: true}, _reason), do: :ok
×
683

684
  @spec reconnect_timeout(map()) :: pos_integer()
685
  def reconnect_timeout(%{proxy: true}),
×
686
    do: @reconnect_timeout_proxy
687

688
  def reconnect_timeout(_),
1✔
689
    do: @reconnect_timeout
690

691
  defp maybe_reconnect(reason, data) do
692
    max_reconnect_retries = Application.get_env(:supavisor, :reconnect_retries)
1✔
693

694
    if data.reconnect_retries > max_reconnect_retries and data.client_sock != nil do
1✔
695
      {:stop, {:failed_to_connect, reason}}
696
    else
697
      {:keep_state_and_data, {:state_timeout, reconnect_timeout(data), :connect}}
698
    end
699
  end
700
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc