• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / e5e7ebfe80dbec4965226225050d4ef5c8216e88-PR-605

21 Feb 2025 02:35PM UTC coverage: 45.973% (-0.03%) from 46.003%
e5e7ebfe80dbec4965226225050d4ef5c8216e88-PR-605

Pull #605

github

hauleth
fix: remaining SSL connections that need to set `verify_none` option
Pull Request #605: fix: remaining SSL connections that need to set `verify_none` option

2 of 9 new or added lines in 3 files covered. (22.22%)

267 existing lines in 26 files now uncovered.

959 of 2086 relevant lines covered (45.97%)

635.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.4
/lib/supavisor/db_handler.ex
1
defmodule Supavisor.DbHandler do
2
  @moduledoc """
3
  This module contains functions to start a link with the database, send requests to the database, and handle incoming messages from clients.
4
  It uses the Supavisor.Protocol.Server module to decode messages from the database and sends messages to clients Supavisor.ClientHandler.
5
  """
6

7
  require Logger
8

9
  @behaviour :gen_statem
10

11
  alias Supavisor.{
12
    ClientHandler,
13
    HandlerHelpers,
14
    Helpers,
15
    Monitoring.Telem,
16
    Protocol.Server
17
  }
18

19
  @type state :: :connect | :authentication | :idle | :busy
20

21
  @reconnect_timeout 2_500
22
  @reconnect_timeout_proxy 500
23
  @sock_closed [:tcp_closed, :ssl_closed]
24
  @proto [:tcp, :ssl]
25
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
26
  @reconnect_retries Application.compile_env(:supavisor, :reconnect_retries)
27

28
  def start_link(config),
29
    do: :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000)
160✔
30

31
  def checkout(pid, sock, caller, timeout \\ 15_000),
161✔
32
    do: :gen_statem.call(pid, {:checkout, sock, caller}, timeout)
161✔
33

34
  @spec checkin(pid()) :: :ok
35
  def checkin(pid), do: :gen_statem.cast(pid, :checkin)
×
36

37
  @spec get_state_and_mode(pid()) :: {:ok, {state, Supavisor.mode()}} | {:error, term()}
38
  def get_state_and_mode(pid) do
146✔
39
    {:ok, :gen_statem.call(pid, :get_state_and_mode, 5_000)}
40
  catch
UNCOV
41
    error, reason -> {:error, {error, reason}}
3✔
42
  end
43

44
  @spec stop(pid()) :: :ok
45
  def stop(pid) do
46
    Logger.debug("DbHandler: Stop pid #{inspect(pid)}")
143✔
47
    :gen_statem.stop(pid, {:shutdown, :client_termination}, 5_000)
143✔
48
  end
49

50
  @impl true
51
  def init(args) do
52
    Process.flag(:trap_exit, true)
161✔
53
    Helpers.set_log_level(args.log_level)
161✔
54
    Helpers.set_max_heap_size(90)
161✔
55

56
    {_, tenant} = args.tenant
161✔
57
    Logger.metadata(project: tenant, user: args.user, mode: args.mode)
161✔
58

59
    data =
161✔
60
      %{
61
        id: args.id,
161✔
62
        sock: nil,
63
        sent: false,
64
        auth: args.auth,
161✔
65
        user: args.user,
161✔
66
        tenant: args.tenant,
161✔
67
        buffer: [],
68
        anon_buffer: [],
69
        db_state: nil,
70
        parameter_status: %{},
71
        nonce: nil,
72
        messages: "",
73
        server_proof: nil,
74
        stats: %{},
75
        client_stats: %{},
76
        mode: args.mode,
161✔
77
        replica_type: args.replica_type,
161✔
78
        reply: nil,
79
        caller: args[:caller] || nil,
161✔
80
        client_sock: args[:client_sock] || nil,
161✔
81
        proxy: args[:proxy] || false,
161✔
82
        active_count: 0,
83
        reconnect_retries: 0
84
      }
85

86
    Telem.handler_action(:db_handler, :started, args.id)
161✔
87
    {:ok, :connect, data, {:next_event, :internal, :connect}}
161✔
88
  end
89

90
  @impl true
91
  def callback_mode, do: [:handle_event_function]
160✔
92

93
  @impl true
94
  def handle_event(:internal, _, :connect, %{auth: auth} = data) do
95
    Logger.debug("DbHandler: Try to connect to DB")
162✔
96

97
    sock_opts =
162✔
98
      [
99
        auth.ip_version,
162✔
100
        mode: :binary,
101
        packet: :raw,
102
        # recbuf: 8192,
103
        # sndbuf: 8192,
104
        # backlog: 2048,
105
        # send_timeout: 120,
106
        # keepalive: true,
107
        # nopush: true,
108
        nodelay: true,
109
        active: false
110
      ]
111

112
    maybe_reconnect_callback = fn reason ->
162✔
UNCOV
113
      if data.reconnect_retries > @reconnect_retries and data.client_sock != nil,
1✔
114
        do: {:stop, {:failed_to_connect, reason}},
115
        else: {:keep_state_and_data, {:state_timeout, reconnect_timeout(data), :connect}}
116
    end
117

118
    Telem.handler_action(:db_handler, :db_connection, data.id)
162✔
119

120
    case :gen_tcp.connect(auth.host, auth.port, sock_opts) do
162✔
121
      {:ok, sock} ->
122
        Logger.debug("DbHandler: auth #{inspect(auth, pretty: true)}")
161✔
123

124
        case try_ssl_handshake({:gen_tcp, sock}, auth) do
161✔
125
          {:ok, sock} ->
126
            tenant = if data.proxy, do: Supavisor.tenant(data.id)
161✔
127
            search_path = Supavisor.search_path(data.id)
161✔
128

129
            case send_startup(sock, auth, tenant, search_path) do
161✔
130
              :ok ->
131
                :ok = activate(sock)
160✔
132
                {:next_state, :authentication, %{data | sock: sock}}
160✔
133

134
              {:error, reason} ->
135
                Logger.error("DbHandler: Send startup error #{inspect(reason)}")
×
136
                maybe_reconnect_callback.(reason)
×
137
            end
138

139
          {:error, reason} ->
140
            Logger.error("DbHandler: Handshake error #{inspect(reason)}")
×
141
            maybe_reconnect_callback.(reason)
×
142
        end
143

144
      other ->
UNCOV
145
        Logger.error(
1✔
UNCOV
146
          "DbHandler: Connection failed #{inspect(other)} to #{inspect(auth.host)}:#{inspect(auth.port)}"
1✔
147
        )
148

UNCOV
149
        maybe_reconnect_callback.(other)
1✔
150
    end
151
  end
152

153
  def handle_event(:state_timeout, :connect, _state, data) do
154
    retry = data.reconnect_retries
×
155
    Logger.warning("DbHandler: Reconnect #{retry} to DB")
×
156

157
    {:keep_state, %{data | reconnect_retries: retry + 1}, {:next_event, :internal, :connect}}
×
158
  end
159

160
  def handle_event(:info, {proto, _, bin}, :authentication, data) when proto in @proto do
161
    dec_pkt = Server.decode(bin)
474✔
162
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
474✔
163

164
    resp = Enum.reduce(dec_pkt, %{}, &handle_auth_pkts(&1, &2, data))
474✔
165

166
    case resp do
474✔
167
      {:authentication_sasl, nonce} ->
158✔
168
        {:keep_state, %{data | nonce: nonce}}
169

170
      {:authentication_server_first_message, server_proof} ->
158✔
171
        {:keep_state, %{data | server_proof: server_proof}}
172

173
      %{authentication_server_final_message: _server_final} ->
×
174
        :keep_state_and_data
175

176
      %{authentication_ok: true} ->
×
177
        :keep_state_and_data
178

179
      :authentication ->
×
180
        :keep_state_and_data
181

UNCOV
182
      :authentication_md5 ->
1✔
183
        {:keep_state, data}
184

185
      {:error_response, ["SFATAL", "VFATAL", "C28P01", reason, _, _, _]} ->
186
        handle_authentication_error(data, reason)
×
187
        Logger.error("DbHandler: Auth error #{inspect(reason)}")
×
188
        {:stop, :invalid_password, data}
×
189

190
      {:error_response, error} ->
191
        Logger.error("DbHandler: Error auth response #{inspect(error)}")
×
192
        {:stop, {:encode_and_forward, error}}
193

194
      {:ready_for_query, acc} ->
195
        ps = acc.ps
157✔
196

197
        Logger.debug(
157✔
198
          "DbHandler: DB ready_for_query: #{inspect(acc.db_state)} #{inspect(ps, pretty: true)}"
157✔
199
        )
200

201
        if data.proxy do
157✔
202
          bin_ps = Server.encode_parameter_status(ps)
×
203
          send(data.caller, {:parameter_status, bin_ps})
×
204
        else
205
          Supavisor.set_parameter_status(data.id, ps)
157✔
206
        end
207

208
        {:next_state, :idle, %{data | parameter_status: ps, reconnect_retries: 0},
157✔
209
         {:next_event, :internal, :check_buffer}}
210

211
      other ->
212
        Logger.error("DbHandler: Undefined auth response #{inspect(other)}")
×
213
        {:stop, :auth_error, data}
×
214
    end
215
  end
216

217
  def handle_event(:internal, :check_buffer, :idle, %{reply: from} = data) when from != nil do
218
    Logger.debug("DbHandler: Check buffer")
146✔
219
    {:next_state, :busy, %{data | reply: nil}, {:reply, from, data.sock}}
146✔
220
  end
221

222
  def handle_event(:internal, :check_buffer, :idle, %{buffer: buff, caller: caller} = data)
223
      when is_pid(caller) do
224
    if buff != [] do
×
225
      Logger.debug("DbHandler: Buffer is not empty, try to send #{IO.iodata_length(buff)} bytes")
×
226
      buff = Enum.reverse(buff)
×
227
      :ok = sock_send(data.sock, buff)
×
228
    end
229

230
    {:next_state, :busy, %{data | buffer: []}}
×
231
  end
232

233
  # check if it needs to apply queries from the anon buffer
234
  def handle_event(:internal, :check_anon_buffer, _, %{anon_buffer: buff, caller: nil} = data) do
UNCOV
235
    Logger.debug("DbHandler: Check anon buffer")
15✔
236

UNCOV
237
    if buff != [] do
15✔
238
      Logger.debug(
×
239
        "DbHandler: Anon buffer is not empty, try to send #{IO.iodata_length(buff)} bytes"
×
240
      )
241

242
      buff = Enum.reverse(buff)
×
243
      :ok = sock_send(data.sock, buff)
×
244
    end
245

246
    {:keep_state, %{data | anon_buffer: []}}
247
  end
248

249
  def handle_event(:internal, :check_anon_buffer, _, _) do
250
    Logger.debug("DbHandler: Anon buffer is empty")
1,342✔
251
    :keep_state_and_data
252
  end
253

254
  # the process received message from db without linked caller
255
  def handle_event(:info, {proto, _, bin}, _, %{caller: nil}) when proto in @proto do
256
    Logger.debug("DbHandler: Got db response #{inspect(bin)} when caller was nil")
×
257
    :keep_state_and_data
258
  end
259

260
  def handle_event(:info, {proto, _, bin}, _, %{replica_type: :read} = data)
261
      when proto in @proto do
262
    Logger.debug("DbHandler: Got read replica message #{inspect(bin)}")
×
263
    pkts = Server.decode(bin)
×
264

265
    resp =
×
266
      cond do
267
        Server.has_read_only_error?(pkts) ->
268
          Logger.error("DbHandler: read only error")
×
269

270
          with [_] <- pkts do
×
271
            # need to flush ready_for_query if it's not in same packet
272
            :ok = receive_ready_for_query()
×
273
          end
274

275
          :read_sql_error
276

277
        List.last(pkts).tag == :ready_for_query ->
×
278
          :ready_for_query
279

280
        true ->
×
281
          :continue
282
      end
283

284
    if resp != :continue do
×
285
      :ok = ClientHandler.db_status(data.caller, resp, bin)
×
286
      {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats)
×
287
      {:keep_state, %{data | stats: stats, caller: handler_caller(data)}}
288
    else
289
      :keep_state_and_data
290
    end
291
  end
292

293
  # forward the message to the client
294
  def handle_event(:info, {proto, _, bin}, _, %{caller: caller, reply: nil} = data)
295
      when is_pid(caller) and proto in @proto do
296
    Logger.debug("DbHandler: Got write replica message  #{inspect(bin)}")
6,256✔
297

298
    if String.ends_with?(bin, Server.ready_for_query()) do
6,256✔
299
      HandlerHelpers.activate(data.sock)
1,357✔
300

301
      {_, stats} =
1,357✔
302
        if not data.proxy,
1,357✔
303
          do: Telem.network_usage(:db, data.sock, data.id, data.stats),
1,357✔
304
          else: {nil, data.stats}
×
305

306
      # in transaction mode, we need to notify the client when the transaction is finished,
307
      # after which it will unlink the direct db connection process from itself.
308
      data =
1,357✔
309
        if data.mode == :transaction do
1,357✔
UNCOV
310
          ClientHandler.db_status(data.caller, :ready_for_query, bin)
15✔
UNCOV
311
          %{data | stats: stats, caller: nil, client_sock: nil, active_count: 0}
15✔
312
        else
313
          HandlerHelpers.sock_send(data.client_sock, bin)
1,342✔
314

315
          {_, client_stats} =
1,342✔
316
            if not data.proxy,
1,342✔
317
              do: Telem.network_usage(:client, data.client_sock, data.id, data.client_stats),
1,342✔
318
              else: {nil, data.client_stats}
×
319

320
          %{data | stats: stats, active_count: 0, client_stats: client_stats}
1,342✔
321
        end
322

323
      {:next_state, :idle, data, {:next_event, :internal, :check_anon_buffer}}
1,357✔
324
    else
325
      if data.active_count > @switch_active_count,
4,899✔
326
        do: HandlerHelpers.active_once(data.sock)
2,581✔
327

328
      HandlerHelpers.sock_send(data.client_sock, bin)
4,899✔
329
      {:keep_state, %{data | active_count: data.active_count + 1}}
4,899✔
330
    end
331
  end
332

333
  def handle_event(:info, {:handle_ps, payload, bin}, _state, data) do
334
    Logger.notice("DbHandler: Apply prepare statement change #{inspect(payload)}")
×
335

336
    {:keep_state, %{data | anon_buffer: [bin | data.anon_buffer]},
×
337
     {:next_event, :internal, :check_anon_buffer}}
338
  end
339

340
  def handle_event({:call, from}, {:checkout, sock, caller}, state, data) do
341
    Logger.debug("DbHandler: checkout call when state was #{state}")
161✔
342

343
    # store the reply ref and send it when the state is idle
344
    if state in [:idle, :busy],
161✔
UNCOV
345
      do: {:keep_state, %{data | client_sock: sock, caller: caller}, {:reply, from, data.sock}},
15✔
346
      else: {:keep_state, %{data | client_sock: sock, caller: caller, reply: from}}
347
  end
348

349
  def handle_event({:call, from}, :ps, _, data) do
350
    Logger.debug("DbHandler: get parameter status")
×
351
    {:keep_state_and_data, {:reply, from, data.parameter_status}}
×
352
  end
353

354
  def handle_event(_, {closed, _}, :busy, data) when closed in @sock_closed do
355
    {:stop, {:shutdown, :db_termination}, data}
×
356
  end
357

358
  def handle_event(_, {closed, _}, state, data) when closed in @sock_closed do
359
    Logger.error("DbHandler: Connection closed when state was #{state}")
×
360

361
    if Application.get_env(:supavisor, :reconnect_on_db_close),
×
362
      do: {:next_state, :connect, data, {:state_timeout, reconnect_timeout(data), :connect}},
×
363
      else: {:stop, {:shutdown, :db_termination}, data}
×
364
  end
365

366
  # linked client_handler went down
367
  def handle_event(_, {:EXIT, pid, reason}, state, data) do
UNCOV
368
    if reason != :normal do
5✔
UNCOV
369
      Logger.error(
5✔
370
        "DbHandler: ClientHandler #{inspect(pid)} went down with reason #{inspect(reason)}"
371
      )
372
    end
373

UNCOV
374
    if state == :busy or data.mode == :session do
5✔
UNCOV
375
      sock_send(data.sock, Server.terminate_message())
5✔
UNCOV
376
      :gen_tcp.close(elem(data.sock, 1))
5✔
UNCOV
377
      {:stop, {:client_handler_down, data.mode}}
5✔
378
    else
379
      {:keep_state, %{data | caller: nil, buffer: []}}
380
    end
381
  end
382

383
  def handle_event({:call, from}, :get_state_and_mode, state, data) do
143✔
384
    {:keep_state_and_data, {:reply, from, {state, data.mode}}}
143✔
385
  end
386

387
  def handle_event(type, content, state, data) do
UNCOV
388
    msg = [
11✔
389
      {"type", type},
390
      {"content", content},
391
      {"state", state},
392
      {"data", data}
393
    ]
394

UNCOV
395
    Logger.debug("DbHandler: Undefined msg: #{inspect(msg, pretty: true)}")
11✔
396

397
    :keep_state_and_data
398
  end
399

400
  @impl true
401
  def terminate(:shutdown, _state, data) do
402
    Telem.handler_action(:db_handler, :stopped, data.id)
5✔
403
    :ok
404
  end
405

406
  def terminate(reason, state, data) do
407
    Telem.handler_action(:db_handler, :stopped, data.id)
147✔
408

409
    if data.client_sock != nil do
147✔
410
      message =
146✔
411
        case reason do
412
          {:encode_and_forward, msg} -> Server.encode_error_message(msg)
×
413
          _ -> Server.error_message("XX000", inspect(reason))
146✔
414
        end
415

416
      HandlerHelpers.sock_send(data.client_sock, message)
146✔
417
    end
418

419
    Logger.error(
147✔
420
      "DbHandler: Terminating with reason #{inspect(reason)} when state was #{inspect(state)}"
421
    )
422
  end
423

424
  @spec try_ssl_handshake(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.sock()} | {:error, term()}
425
  defp try_ssl_handshake(sock, %{upstream_ssl: true} = auth) do
426
    case sock_send(sock, Server.ssl_request()) do
×
427
      :ok -> ssl_recv(sock, auth)
×
428
      error -> error
×
429
    end
430
  end
431

432
  defp try_ssl_handshake(sock, _), do: {:ok, sock}
161✔
433

434
  @spec ssl_recv(Supavisor.tcp_sock(), map) :: {:ok, Supavisor.ssl_sock()} | {:error, term}
435
  defp ssl_recv({:gen_tcp, sock} = s, auth) do
436
    case :gen_tcp.recv(sock, 1, 15_000) do
×
437
      {:ok, <<?S>>} -> ssl_connect(s, auth)
×
438
      {:ok, <<?N>>} -> {:error, :ssl_not_available}
×
439
      {:error, _} = error -> error
×
440
    end
441
  end
442

443
  @spec ssl_connect(Supavisor.tcp_sock(), map, pos_integer) ::
444
          {:ok, Supavisor.ssl_sock()} | {:error, term}
445
  defp ssl_connect({:gen_tcp, sock}, auth, timeout \\ 5000) do
446
    opts =
×
447
      case auth.upstream_verify do
×
448
        :peer ->
×
449
          [
450
            verify: :verify_peer,
451
            cacerts: [auth.upstream_tls_ca],
×
452
            # unclear behavior on pg14
453
            server_name_indication: auth.sni_hostname || auth.host,
×
454
            customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
455
          ]
456

457
        :none ->
×
458
          [verify: :verify_none]
459
      end
460

461
    case :ssl.connect(sock, opts, timeout) do
×
462
      {:ok, ssl_sock} ->
×
463
        {:ok, {:ssl, ssl_sock}}
464

465
      {:error, reason} ->
×
466
        {:error, reason}
467
    end
468
  end
469

470
  @spec send_startup(Supavisor.sock(), map(), String.t() | nil, String.t() | nil) ::
471
          :ok | {:error, term}
472
  def send_startup(sock, auth, tenant, search_path) do
473
    user =
161✔
474
      if is_nil(tenant), do: get_user(auth), else: "#{get_user(auth)}.#{tenant}"
161✔
475

476
    msg =
160✔
477
      :pgo_protocol.encode_startup_message(
478
        [
479
          {"user", user},
480
          {"database", auth.database},
160✔
481
          {"application_name", auth.application_name}
160✔
482
        ] ++ if(search_path, do: [{"options", "--search_path=#{search_path}"}], else: [])
160✔
483
      )
484

485
    sock_send(sock, msg)
160✔
486
  end
487

488
  @spec sock_send(Supavisor.sock(), iodata) :: :ok | {:error, term}
489
  defp sock_send({mod, sock}, data) do
490
    mod.send(sock, data)
165✔
491
  end
492

493
  @spec activate(Supavisor.sock()) :: :ok | {:error, term}
494
  defp activate({:gen_tcp, sock}) do
495
    :inet.setopts(sock, active: true)
160✔
496
  end
497

498
  defp activate({:ssl, sock}) do
499
    :ssl.setopts(sock, active: true)
×
500
  end
501

502
  defp get_user(auth) do
503
    if auth.require_user do
319✔
UNCOV
504
      auth.secrets.().db_user
22✔
505
    else
506
      auth.secrets.().user
297✔
507
    end
508
  end
509

510
  @spec receive_ready_for_query() :: :ok | :timeout_error
511
  defp receive_ready_for_query do
512
    receive do
×
513
      {_proto, _socket, <<?Z, 5::32, ?I>>} ->
×
514
        :ok
515
    after
516
      15_000 -> :timeout_error
×
517
    end
518
  end
519

520
  @spec handler_caller(map()) :: pid() | nil
521
  defp handler_caller(%{mode: :session} = data), do: data.caller
×
522
  defp handler_caller(_), do: nil
×
523

524
  @spec check_ready(binary()) ::
525
          {:ready_for_query, :idle | :transaction_block | :failed_transaction_block} | :continue
526
  def check_ready(bin) do
UNCOV
527
    bin_size = byte_size(bin)
9✔
528

UNCOV
529
    case bin do
9✔
530
      <<_::binary-size(bin_size - 6), 90, 0, 0, 0, 5, status_indicator::binary>> ->
UNCOV
531
        indicator =
6✔
532
          case status_indicator do
UNCOV
533
            <<?I>> -> :idle
2✔
UNCOV
534
            <<?T>> -> :transaction_block
2✔
UNCOV
535
            <<?E>> -> :failed_transaction_block
2✔
536
            _ -> :continue
×
537
          end
538

539
        {:ready_for_query, indicator}
540

UNCOV
541
      _ ->
3✔
542
        :continue
543
    end
544
  end
545

546
  @spec handle_auth_pkts(map(), map(), map()) :: any()
547
  defp handle_auth_pkts(%{tag: :parameter_status, payload: {k, v}}, acc, _),
548
    do: update_in(acc, [:ps], fn ps -> Map.put(ps || %{}, k, v) end)
2,041✔
549

550
  defp handle_auth_pkts(%{tag: :ready_for_query, payload: db_state}, acc, _),
157✔
551
    do: {:ready_for_query, Map.put(acc, :db_state, db_state)}
552

553
  defp handle_auth_pkts(%{tag: :backend_key_data, payload: payload}, acc, data) do
554
    key = self()
157✔
555
    conn = %{host: data.auth.host, port: data.auth.port, ip_ver: data.auth.ip_version}
157✔
556
    Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn))
157✔
557
    Logger.debug("DbHandler: Backend #{inspect(key)} data: #{inspect(payload)}")
157✔
558
    Map.put(acc, :backend_key_data, payload)
157✔
559
  end
560

561
  defp handle_auth_pkts(%{payload: {:authentication_sasl_password, methods_b}}, _, data) do
562
    nonce =
158✔
563
      case Server.decode_string(methods_b) do
564
        {:ok, req_method, _} ->
565
          Logger.debug("DbHandler: SASL method #{inspect(req_method)}")
158✔
566
          nonce = :pgo_scram.get_nonce(16)
158✔
567
          user = get_user(data.auth)
158✔
568
          client_first = :pgo_scram.get_client_first(user, nonce)
158✔
569
          client_first_size = IO.iodata_length(client_first)
158✔
570

571
          sasl_initial_response = [
158✔
572
            "SCRAM-SHA-256",
573
            0,
574
            <<client_first_size::32-integer>>,
575
            client_first
576
          ]
577

578
          bin = :pgo_protocol.encode_scram_response_message(sasl_initial_response)
158✔
579
          :ok = HandlerHelpers.sock_send(data.sock, bin)
158✔
580
          nonce
158✔
581

582
        other ->
583
          Logger.error("DbHandler: Undefined sasl method #{inspect(other)}")
×
584
          nil
585
      end
586

587
    {:authentication_sasl, nonce}
588
  end
589

590
  defp handle_auth_pkts(
591
         %{payload: {:authentication_server_first_message, server_first}},
592
         _,
593
         data
594
       )
595
       when data.auth.require_user == false do
596
    nonce = data.nonce
148✔
597
    server_first_parts = Helpers.parse_server_first(server_first, nonce)
148✔
598

599
    {client_final_message, server_proof} =
148✔
600
      Helpers.get_client_final(
601
        :auth_query,
602
        data.auth.secrets.(),
148✔
603
        server_first_parts,
604
        nonce,
605
        data.auth.secrets.().user,
148✔
606
        "biws"
607
      )
608

609
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
148✔
610
    :ok = HandlerHelpers.sock_send(data.sock, bin)
148✔
611

612
    {:authentication_server_first_message, server_proof}
613
  end
614

615
  defp handle_auth_pkts(
616
         %{payload: {:authentication_server_first_message, server_first}},
617
         _,
618
         data
619
       ) do
UNCOV
620
    nonce = data.nonce
10✔
UNCOV
621
    server_first_parts = :pgo_scram.parse_server_first(server_first, nonce)
10✔
622

UNCOV
623
    {client_final_message, server_proof} =
10✔
624
      :pgo_scram.get_client_final(
625
        server_first_parts,
626
        nonce,
UNCOV
627
        data.auth.user,
10✔
UNCOV
628
        data.auth.secrets.().password
10✔
629
      )
630

UNCOV
631
    bin = :pgo_protocol.encode_scram_response_message(client_final_message)
10✔
UNCOV
632
    :ok = HandlerHelpers.sock_send(data.sock, bin)
10✔
633

634
    {:authentication_server_first_message, server_proof}
635
  end
636

637
  defp handle_auth_pkts(
638
         %{payload: {:authentication_server_final_message, server_final}},
639
         acc,
640
         _data
641
       ),
642
       do: Map.put(acc, :authentication_server_final_message, server_final)
157✔
643

644
  defp handle_auth_pkts(
645
         %{payload: :authentication_ok},
646
         acc,
647
         _data
648
       ),
649
       do: Map.put(acc, :authentication_ok, true)
157✔
650

651
  defp handle_auth_pkts(%{payload: {:authentication_md5_password, salt}} = dec_pkt, _, data) do
UNCOV
652
    Logger.debug("DbHandler: dec_pkt, #{inspect(dec_pkt, pretty: true)}")
1✔
653

UNCOV
654
    digest =
1✔
UNCOV
655
      if data.auth.method == :password do
1✔
UNCOV
656
        Helpers.md5([data.auth.password.(), data.auth.user])
1✔
657
      else
658
        data.auth.secrets.().secret
×
659
      end
660

UNCOV
661
    payload = ["md5", Helpers.md5([digest, salt]), 0]
1✔
UNCOV
662
    bin = [?p, <<IO.iodata_length(payload) + 4::signed-32>>, payload]
1✔
UNCOV
663
    :ok = HandlerHelpers.sock_send(data.sock, bin)
1✔
664
    :authentication_md5
665
  end
666

667
  defp handle_auth_pkts(%{tag: :error_response, payload: error}, _acc, _data),
×
668
    do: {:error_response, error}
669

670
  defp handle_auth_pkts(_e, acc, _data), do: acc
×
671

672
  @spec handle_authentication_error(map(), String.t()) :: any()
673
  defp handle_authentication_error(%{proxy: false} = data, reason) do
674
    tenant = Supavisor.tenant(data.id)
×
675

676
    :erpc.multicast([node() | Node.list()], fn ->
×
677
      Cachex.del(Supavisor.Cache, {:secrets, tenant, data.user})
×
678
      Cachex.del(Supavisor.Cache, {:secrets_check, tenant, data.user})
×
679

680
      Registry.dispatch(Supavisor.Registry.TenantClients, data.id, fn entries ->
×
681
        for {client_handler, _meta} <- entries,
×
682
            do: send(client_handler, {:disconnect, reason})
683
      end)
684
    end)
685

686
    Supavisor.stop(data.id)
×
687
  end
688

689
  defp handle_authentication_error(%{proxy: true}, _reason), do: :ok
×
690

691
  @spec reconnect_timeout(map()) :: pos_integer()
692
  def reconnect_timeout(%{proxy: true}),
×
693
    do: @reconnect_timeout_proxy
694

UNCOV
695
  def reconnect_timeout(_),
1✔
696
    do: @reconnect_timeout
697
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc