• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / e5e7ebfe80dbec4965226225050d4ef5c8216e88-PR-605

21 Feb 2025 02:35PM UTC coverage: 45.973% (-0.03%) from 46.003%
e5e7ebfe80dbec4965226225050d4ef5c8216e88-PR-605

Pull #605

github

hauleth
fix: remaining SSL connections that need to set `verify_none` option
Pull Request #605: fix: remaining SSL connections that need to set `verify_none` option

2 of 9 new or added lines in 3 files covered. (22.22%)

267 existing lines in 26 files now uncovered.

959 of 2086 relevant lines covered (45.97%)

635.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.89
/lib/supavisor/protocol/server.ex
1
defmodule Supavisor.Protocol.Server do
2
  @moduledoc """
3
  The Supavisor.Protocol.Server module is responsible for decoding data received from the PostgreSQL server. It provides several functions to decode payloads from different types of messages.
4

5
  Message Formats: https://www.postgresql.org/docs/current/protocol-message-formats.html
6
  Error codes https://www.postgresql.org/docs/current/errcodes-appendix.html
7
  """
8
  require Logger
9
  alias Supavisor.Protocol.PgType
10

11
  @pkt_header_size 5
12
  @authentication_ok <<?R, 8::32, 0::32>>
13
  @ready_for_query <<?Z, 5::32, ?I>>
14
  @ssl_request <<8::32, 1234::16, 5679::16>>
15
  @scram_request <<?R, 23::32, 10::32, "SCRAM-SHA-256", 0, 0>>
16
  @msg_cancel_header <<16::32, 1234::16, 5678::16>>
17
  @application_name <<?S, 31::32, "application_name", 0, "Supavisor", 0>>
18
  @terminate_message <<?X, 4::32>>
19

20
  defmodule Pkt do
21
    @moduledoc "Representing a packet structure with tag, length, and payload fields."
22
    defstruct([:tag, :len, :payload])
23

24
    @type t :: %Pkt{
25
            tag: atom,
26
            len: integer,
27
            payload: any
28
          }
29
  end
30

31
  defmacro cancel_message(pid, key) do
32
    quote do
33
      <<unquote(@msg_cancel_header)::binary, unquote(pid)::32, unquote(key)::32>>
34
    end
35
  end
36

37
  @spec decode(iodata()) :: [Pkt.t()] | []
38
  def decode(data) do
39
    decode(data, [])
476✔
40
  end
41

42
  def decode(data, acc) when byte_size(data) >= @pkt_header_size do
43
    {:ok, pkt, rest} = decode_pkt(data)
2,988✔
44
    decode(rest, [pkt | acc])
2,988✔
45
  end
46

47
  def decode(_, acc) do
48
    Enum.reverse(acc)
476✔
49
  end
50

51
  def packet(tag, pkt_len, payload) do
52
    %Pkt{
×
53
      tag: tag,
54
      len: pkt_len + 1,
55
      payload: decode_payload(tag, payload)
56
    }
57
  end
58

59
  def decode_pkt(<<char::integer-8, pkt_len::integer-32, rest::binary>>, decode_payload \\ true) do
60
    tag = tag(char)
3,340✔
61
    payload_len = pkt_len - 4
3,340✔
62

63
    <<bin_payload::binary-size(payload_len), rest2::binary>> = rest
3,340✔
64

65
    payload =
3,340✔
66
      if decode_payload do
×
67
        decode_payload(tag, bin_payload)
3,340✔
68
      else
69
        nil
70
      end
71

72
    {:ok, %Pkt{tag: tag, len: pkt_len + 1, payload: payload}, rest2}
3,340✔
73
  end
74

75
  def tag(char) do
76
    case char do
3,340✔
77
      ?R -> :authentication
631✔
78
      ?K -> :backend_key_data
158✔
79
      ?2 -> :bind_complete
×
80
      ?3 -> :close_complete
×
81
      ?C -> :command_complete
×
82
      ?d -> :copy_data
×
83
      ?c -> :copy_done
×
84
      ?G -> :copy_in_response
×
85
      ?H -> :copy_out_response
×
86
      ?W -> :copy_both_response
×
87
      ?D -> :data_row
×
88
      ?I -> :empty_query_response
×
UNCOV
89
      ?E -> :error_response
1✔
90
      ?V -> :function_call_response
×
91
      ?n -> :no_data
×
92
      ?N -> :notice_response
×
93
      ?A -> :notification_response
×
94
      ?t -> :parameter_description
×
95
      ?S -> :parameter_status
2,041✔
96
      ?1 -> :parse_complete
×
97
      ?s -> :portal_suspended
×
98
      ?Z -> :ready_for_query
157✔
99
      ?T -> :row_description
×
100
      ?p -> :password_message
352✔
101
      _ -> :undefined
×
102
    end
103
  end
104

105
  def decode_payload(:authentication, payload) do
106
    case payload do
631✔
107
      <<0::integer-32>> ->
157✔
108
        :authentication_ok
109

110
      <<2::integer-32>> ->
×
111
        :authentication_kerberos_v5
112

113
      <<3::integer-32>> ->
×
114
        :authentication_cleartext_password
115

UNCOV
116
      <<5::integer-32, salt::binary-4>> ->
1✔
117
        {:authentication_md5_password, salt}
118

119
      <<6::integer-32>> ->
×
120
        :authentication_scm_credential
121

122
      <<7::integer-32>> ->
×
123
        :authentication_gss
124

125
      <<8::integer-32, rest::binary>> ->
×
126
        {:authentication_gss_continue, rest}
127

128
      <<9::integer-32>> ->
×
129
        :authentication_sspi
130

131
      <<10::integer-32, methods_b::binary>> ->
158✔
132
        {:authentication_sasl_password, methods_b}
133

134
      <<11::integer-32, server_first::binary>> ->
158✔
135
        {:authentication_server_first_message, server_first}
136

137
      <<12::integer-32, server_final_msg::binary>> ->
157✔
138
        {:authentication_server_final_message, server_final_msg}
139

140
      other ->
×
141
        {:undefined, other}
142
    end
143
  end
144

145
  def decode_payload(:parameter_status, payload) do
146
    case String.split(payload, <<0>>, trim: true) do
2,041✔
147
      [k, v] -> {k, v}
2,041✔
148
      _ -> :undefined
×
149
    end
150
  end
151

152
  def decode_payload(:backend_key_data, <<pid::integer-32, key::integer-32>>) do
153
    %{pid: pid, key: key}
158✔
154
  end
155

156
  def decode_payload(:ready_for_query, payload) do
157
    case payload do
157✔
158
      <<"I">> -> :idle
157✔
159
      <<"T">> -> :transaction
×
160
      <<"E">> -> :error
×
161
    end
162
  end
163

164
  def decode_payload(:parse_complete, "") do
×
165
    :parse_complete
166
  end
167

168
  def decode_payload(:parameter_description, <<count::integer-16, rest::binary>>) do
×
169
    {count, decode_parameter_description(rest, [])}
170
  end
171

172
  def decode_payload(:row_description, <<count::integer-16, rest::binary>>) do
173
    decode_row_description(count, rest, [])
×
174
  end
175

176
  def decode_payload(:data_row, _payload) do
×
177
    nil
178
  end
179

180
  # https://www.postgresql.org/docs/current/protocol-error-fields.html
181
  def decode_payload(:error_response, payload) do
UNCOV
182
    :binary.split(payload, <<0>>, [:global, :trim_all])
1✔
183
  end
184

185
  def decode_payload(
186
        :password_message,
187
        <<"SCRAM-SHA-256", 0, _::32, channel::binary-3, bin::binary>>
188
      ) do
189
    case kv_to_map(bin) do
177✔
190
      {:ok, map} ->
191
        channel =
177✔
192
          case channel do
193
            "n,," -> "biws"
177✔
194
            "y,," -> "eSws"
×
195
          end
196

197
        {:scram_sha_256, Map.put(map, "c", channel)}
198

199
      {:error, _} ->
×
200
        :undefined
201
    end
202
  end
203

204
  def decode_payload(:password_message, "md5" <> _ = bin) do
205
    case :binary.split(bin, <<0>>) do
×
206
      [digest, ""] -> {:md5, digest}
×
207
      _ -> :undefined
×
208
    end
209
  end
210

211
  def decode_payload(:password_message, bin) do
212
    case kv_to_map(bin) do
175✔
213
      {:ok, map} -> {:first_msg_response, map}
175✔
214
      {:error, _} -> :undefined
×
215
    end
216
  end
217

218
  def decode_payload(_, _) do
×
219
    :undefined
220
  end
221

222
  @spec kv_to_map(String.t()) :: {:ok, map()} | {:error, String.t()}
223
  def kv_to_map(bin) do
224
    Regex.scan(~r/(\w+)=([^,]*)/, bin)
225
    |> Map.new(fn [_, k, v] -> {k, v} end)
879✔
226
    |> case do
352✔
227
      map when map_size(map) > 0 -> {:ok, map}
352✔
228
      _ -> {:error, "invalid key value string"}
×
229
    end
230
  end
231

232
  def decode_row_description(0, "", acc), do: Enum.reverse(acc)
×
233

234
  def decode_row_description(count, rest, acc) do
235
    case decode_string(rest) do
×
236
      {:ok, field_name,
237
       <<table_oid::integer-32, attr_num::integer-16, data_type_oid::integer-32,
238
         data_type_size::integer-16, type_modifier::integer-32, format_code::integer-16,
239
         tail::binary>>} ->
240
        case decode_format_code(format_code) do
×
241
          {:ok, format} ->
242
            field = %{
×
243
              name: field_name,
244
              type_info: PgType.type(data_type_oid),
245
              table_oid: table_oid,
246
              attr_number: attr_num,
247
              data_type_oid: data_type_oid,
248
              data_type_size: data_type_size,
249
              type_modifier: type_modifier,
250
              format: format
251
            }
252

253
            decode_row_description(count - 1, tail, [field | acc])
×
254
        end
255

256
      _ ->
×
257
        {:error, :decode}
258
    end
259
  end
260

261
  def decode_format_code(0) do
×
262
    {:ok, :text}
263
  end
264

265
  def decode_format_code(1) do
×
266
    {:ok, :binary}
267
  end
268

269
  def decode_format_code(_) do
×
270
    {:error, :unknown_format_code}
271
  end
272

273
  def decode_string(bin) do
274
    case :binary.match(bin, <<0>>) do
158✔
275
      :nomatch ->
×
276
        {:error, :not_null_terminated}
277

278
      {pos, 1} ->
279
        {string, <<0, rest::binary>>} = :erlang.split_binary(bin, pos)
158✔
280
        {:ok, string, rest}
158✔
281
    end
282
  end
283

284
  @spec scram_request() :: iodata
285
  def scram_request do
177✔
286
    @scram_request
287
  end
288

289
  @spec md5_request(<<_::32>>) :: iodata
290
  def md5_request(salt) do
×
291
    [<<?R, 12::32, 5::32>>, salt]
292
  end
293

294
  @spec exchange_first_message(binary, binary | boolean, pos_integer) :: binary
295
  def exchange_first_message(nonce, salt \\ false, iterations \\ 4096) do
296
    secret =
177✔
297
      if salt do
298
        salt
148✔
299
      else
UNCOV
300
        :pgo_scram.get_nonce(16) |> Base.encode64()
29✔
301
      end
302

303
    server_nonce = :pgo_scram.get_nonce(16) |> Base.encode64()
177✔
304
    "r=#{nonce <> server_nonce},s=#{secret},i=#{iterations}"
177✔
305
  end
306

307
  @spec exchange_message(:first | :final, binary()) :: iodata()
308
  def exchange_message(type, message) do
309
    code =
354✔
310
      case type do
311
        :first ->
177✔
312
          11
313

314
        :final ->
177✔
315
          12
316
      end
317

318
    [<<?R, byte_size(message) + 8::32, code::32>>, message]
319
  end
320

321
  @spec error_message(binary(), binary()) :: iodata()
322
  def error_message(code, value) do
323
    message = ["SFATAL", 0, "VFATAL", 0, "C", code, 0, "M", value, 0, 0]
149✔
324
    [<<?E, IO.iodata_length(message) + 4::32>>, message]
325
  end
326

327
  @spec encode_error_message(list()) :: iodata()
328
  def encode_error_message(message) when is_list(message) do
329
    message = Enum.join(message, <<0>>) <> <<0, 0>>
×
330
    [<<?E, byte_size(message) + 4::32>>, message]
331
  end
332

333
  def decode_parameter_description("", acc), do: Enum.reverse(acc)
×
334

335
  def decode_parameter_description(<<oid::integer-32, rest::binary>>, acc) do
336
    decode_parameter_description(rest, [oid | acc])
×
337
  end
338

339
  def flush do
340
    <<?H, 4::integer-32>>
×
341
  end
342

343
  def sync do
344
    <<?S, 4::integer-32>>
×
345
  end
346

347
  def encode(query) do
348
    payload = [[], <<0>>, query, <<0>>, <<0, 0>>, []]
×
349
    payload_len = IO.iodata_length(payload) + 4
×
350
    [<<?P, payload_len::integer-32>>, payload]
351
  end
352

353
  def test_extended_query do
×
354
    [
355
      encode("select * from todos where id = 40;"),
356
      [<<68, 0, 0, 0, 6, 83>>, [], <<0>>],
357
      flush()
358
    ]
359
  end
360

361
  def select_1_response do
362
    <<84, 0, 0, 0, 33, 0, 1, 63, 99, 111, 108, 117, 109, 110, 63, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
×
363
      23, 0, 4, 255, 255, 255, 255, 0, 0, 68, 0, 0, 0, 11, 0, 1, 0, 0, 0, 1, 49, 67, 0, 0, 0, 13,
364
      83, 69, 76, 69, 67, 84, 32, 49, 0, 90, 0, 0, 0, 5, 73>>
365
  end
366

367
  def authentication_ok do
173✔
368
    @authentication_ok
369
  end
370

371
  @spec encode_parameter_status(map) :: iodata()
372
  def encode_parameter_status(ps) do
373
    for {key, value} <- ps do
20✔
374
      encode_pkt(:parameter_status, key, value)
375
    end
376
  end
377

378
  @spec encode_pkt(:parameter_status, binary, binary) :: iodata()
379
  def encode_pkt(:parameter_status, key, value) do
380
    payload = [key, <<0>>, value, <<0>>]
274✔
381
    len = IO.iodata_length(payload) + 4
274✔
382
    [<<?S, len::integer-32>>, payload]
383
  end
384

385
  @spec backend_key_data() :: {iodata(), binary}
386
  def backend_key_data do
387
    pid = System.unique_integer([:positive, :monotonic])
172✔
388
    key = :crypto.strong_rand_bytes(4)
172✔
389
    payload = <<pid::integer-32, key::binary>>
172✔
390
    len = IO.iodata_length(payload) + 4
172✔
391
    {<<?K, len::32>>, payload}
392
  end
393

394
  @spec ready_for_query() :: binary()
395
  def ready_for_query do
6,432✔
396
    @ready_for_query
397
  end
398

399
  # SSLRequest message
400
  @spec ssl_request() :: binary()
401
  def ssl_request do
×
402
    @ssl_request
403
  end
404

405
  # The startup packet payload is a list of key/value pairs, separated by null bytes
406
  def decode_startup_packet_payload(payload) do
407
    fields = String.split(payload, <<0>>, trim: true)
178✔
408

409
    # If the number of fields is odd, then the payload is malformed
410
    if rem(length(fields), 2) == 1 do
178✔
411
      {:error, :bad_startup_payload}
412
    else
413
      map =
178✔
414
        fields
415
        |> Enum.chunk_every(2)
416
        |> Enum.map(fn
417
          ["options" = k, v] -> {k, URI.decode_query(v)}
×
418
          [k, v] -> {k, v}
632✔
419
        end)
420
        |> Map.new()
421

422
      # We only do light validation on the fields in the payload. The only field we use at the
423
      # moment is `user`. If that's missing, this is a bad payload.
424
      if Map.has_key?(map, "user") do
178✔
425
        {:ok, map}
426
      else
427
        {:error, :bad_startup_payload}
428
      end
429
    end
430
  end
431

432
  def decode_startup_packet(<<len::integer-32, _protocol::binary-4, rest::binary>>) do
433
    with {:ok, payload} <- decode_startup_packet_payload(rest) do
178✔
434
      pkt = %{
178✔
435
        len: len,
436
        payload: payload,
437
        tag: :startup
438
      }
439

440
      {:ok, pkt}
441
    end
442
  end
443

444
  def decode_startup_packet(_) do
2✔
445
    {:error, :bad_startup_payload}
446
  end
447

448
  def encode_startup_packet(payload) do
449
    bin =
×
450
      Enum.reduce(payload, "", fn
451
        # remove options
452
        {"options", _}, acc ->
453
          acc
×
454

455
        {"application_name" = k, v}, acc ->
456
          <<k::binary, 0, v::binary, " via Supavisor", 0>> <> acc
×
457

458
        {k, v}, acc ->
459
          <<k::binary, 0, v::binary, 0>> <> acc
×
460
      end)
461

462
    <<byte_size(bin) + 9::32, 0, 3, 0, 0, bin::binary, 0>>
×
463
  end
464

465
  @spec has_read_only_error?(list) :: boolean
466
  def has_read_only_error?(pkts) do
467
    Enum.any?(pkts, fn
×
468
      %{payload: ["SERROR", "VERROR", "C25006" | _]} -> true
×
469
      _ -> false
×
470
    end)
471
  end
472

473
  @spec application_name() :: binary
474
  def application_name, do: @application_name
×
475

476
  @spec terminate_message() :: binary
UNCOV
477
  def terminate_message(), do: @terminate_message
5✔
478
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc