• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 16470769502

23 Jul 2025 12:34PM UTC coverage: 57.067% (+1.7%) from 55.355%
16470769502

Pull #694

github

web-flow
Merge 78a9c0b2c into deaa48192
Pull Request #694: feat: improved named prepared statements support

175 of 217 new or added lines in 11 files covered. (80.65%)

16 existing lines in 4 files now uncovered.

1292 of 2264 relevant lines covered (57.07%)

1126.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.38
/lib/supavisor/protocol/server.ex
1
defmodule Supavisor.Protocol.Server do
2
  @moduledoc """
3
  The Supavisor.Protocol.Server module is responsible for decoding data received from the PostgreSQL server. It provides several functions to decode payloads from different types of messages.
4

5
  Message Formats: https://www.postgresql.org/docs/current/protocol-message-formats.html
6
  Error codes https://www.postgresql.org/docs/current/errcodes-appendix.html
7
  """
8
  require Logger
9
  alias Supavisor.Protocol.PgType
10

11
  @pkt_header_size 5
12
  @authentication_ok <<?R, 8::32, 0::32>>
13
  @ready_for_query <<?Z, 5::32, ?I>>
14
  @ssl_request <<8::32, 1234::16, 5679::16>>
15
  @scram_request <<?R, 23::32, 10::32, "SCRAM-SHA-256", 0, 0>>
16
  @msg_cancel_header <<16::32, 1234::16, 5678::16>>
17
  @application_name <<?S, 31::32, "application_name", 0, "Supavisor", 0>>
18
  @terminate_message <<?X, 4::32>>
19
  @parse_complete_message <<?1, 4::32>>
20
  @bind_complete_message <<?2, 4::32>>
21
  @close_complete_message <<?3, 4::32>>
22
  @flush <<?H, 4::integer-32>>
23
  @sync <<?S, 4::integer-32>>
24

25
  defmodule Pkt do
26
    @moduledoc """
27
    Represents a packet structure with tag, length, and payload fields.
28

29
    The original binary can be found on the bin field.
30
    """
31

32
    defstruct([:tag, :len, :payload, :bin])
33

34
    @type t :: %Pkt{
35
            tag: atom,
36
            len: integer,
37
            payload: any,
38
            bin: binary
39
          }
40
  end
41

42
  defmacro cancel_message(pid, key) do
43
    quote do
44
      <<unquote(@msg_cancel_header)::binary, unquote(pid)::32, unquote(key)::32>>
45
    end
46
  end
47

48
  defmacro ssl_request_message, do: @ssl_request
2✔
NEW
49
  defmacro parse_complete_message, do: @parse_complete_message
×
NEW
50
  defmacro bind_complete_message, do: @bind_complete_message
×
NEW
51
  defmacro close_complete_message, do: @close_complete_message
×
52

53
  defmacro parameter_description_message_shape do
54
    quote do
55
      <<?t, _rest::binary>>
56
    end
57
  end
58

59
  @spec decode(iodata()) :: {:ok, [Pkt.t()], rest :: binary()}
60
  def decode(data), do: decode(data, [])
416✔
61

62
  @spec decode_pkt(binary()) ::
63
          {:ok, Pkt.t(), binary()} | {:error, :bad_packet} | {:error, :incomplete}
64
  def decode_pkt(<<char::integer-8, pkt_len::integer-32, rest::binary>>) do
65
    payload_len = pkt_len - 4
3,216✔
66

67
    case rest do
3,216✔
68
      <<bin_payload::binary-size(payload_len), rest2::binary>> ->
69
        tag = tag(char)
3,215✔
70
        payload = decode_payload(tag, bin_payload)
3,215✔
71

72
        pkt = %Pkt{
3,215✔
73
          tag: tag,
74
          len: pkt_len + 1,
75
          payload: payload,
76
          bin: <<char, pkt_len::32, bin_payload::binary>>
77
        }
78

79
        {:ok, pkt, rest2}
3,215✔
80

81
      _ ->
1✔
82
        {:error, :incomplete}
83
    end
84
  end
85

86
  def decode_pkt(_), do: {:error, :bad_packet}
1✔
87

88
  @spec decode_string(binary()) :: {:ok, binary(), binary()} | {:error, :not_null_terminated}
89
  def decode_string(bin) do
90
    case :binary.match(bin, <<0>>) do
145✔
91
      :nomatch ->
2✔
92
        {:error, :not_null_terminated}
93

94
      {pos, 1} ->
95
        {string, <<0, rest::binary>>} = :erlang.split_binary(bin, pos)
143✔
96
        {:ok, string, rest}
143✔
97
    end
98
  end
99

100
  @spec md5_request(<<_::32>>) :: iodata()
101
  def md5_request(salt), do: [<<?R, 12::32, 5::32>>, salt]
1✔
102

103
  @spec exchange_first_message(binary, binary | boolean, pos_integer) :: binary
104
  def exchange_first_message(nonce, salt \\ false, iterations \\ 4096) do
105
    server_nonce =
280✔
106
      16
107
      |> :pgo_scram.get_nonce()
108
      |> Base.encode64()
109

110
    secret = if salt, do: salt, else: server_nonce
280✔
111
    "r=#{nonce <> server_nonce},s=#{secret},i=#{iterations}"
280✔
112
  end
113

114
  @spec exchange_message(:first | :final, binary()) :: iodata()
115
  def exchange_message(type, message) do
116
    code =
560✔
117
      case type do
118
        :first -> 11
280✔
119
        :final -> 12
280✔
120
      end
121

122
    [<<?R, byte_size(message) + 8::32, code::32>>, message]
123
  end
124

125
  @spec error_message(binary(), binary()) :: iodata()
126
  def error_message(code, value) do
127
    message = ["SFATAL", 0, "VFATAL", 0, "C", code, 0, "M", value, 0, 0]
106✔
128
    [<<?E, IO.iodata_length(message) + 4::32>>, message]
129
  end
130

131
  @spec encode_error_message(list()) :: iodata()
132
  def encode_error_message(message) when is_list(message) do
133
    message = Enum.join(message, <<0>>) <> <<0, 0>>
1✔
134
    [<<?E, byte_size(message) + 4::32>>, message]
135
  end
136

137
  @spec encode_parameter_status(map) :: iodata()
138
  def encode_parameter_status(ps) do
139
    for {key, value} <- ps do
24✔
140
      encode_pkt(:parameter_status, key, value)
141
    end
142
  end
143

144
  @spec encode_pkt(:parameter_status, binary, binary) :: iodata()
145
  def encode_pkt(:parameter_status, key, value) do
146
    payload = [key, <<0>>, value, <<0>>]
326✔
147
    len = IO.iodata_length(payload) + 4
326✔
148
    [<<?S, len::integer-32>>, payload]
149
  end
150

151
  @spec backend_key_data() :: {iodata(), binary}
152
  def backend_key_data do
153
    pid = System.unique_integer([:positive, :monotonic])
273✔
154
    key = :crypto.strong_rand_bytes(4)
273✔
155
    payload = <<pid::integer-32, key::binary>>
273✔
156
    len = IO.iodata_length(payload) + 4
273✔
157
    {<<?K, len::32>>, payload}
158
  end
159

160
  @spec decode_startup_packet(binary()) :: {:ok, map()} | {:error, :bad_startup_payload}
161
  def decode_startup_packet(<<len::integer-32, _protocol::binary-4, rest::binary>>) do
162
    with {:ok, payload} <- decode_startup_packet_payload(rest) do
283✔
163
      pkt = %{
282✔
164
        len: len,
165
        payload: payload,
166
        tag: :startup
167
      }
168

169
      {:ok, pkt}
170
    end
171
  end
172

173
  def decode_startup_packet(_), do: {:error, :bad_startup_payload}
3✔
174

175
  @spec has_read_only_error?(list()) :: boolean
176
  def has_read_only_error?(pkts) do
177
    Enum.any?(pkts, fn
4✔
178
      %{payload: ["SERROR", "VERROR", "C25006" | _]} -> true
2✔
179
      _ -> false
2✔
180
    end)
181
  end
182

183
  @spec application_name :: binary()
184
  def application_name, do: @application_name
1✔
185

186
  @spec terminate_message :: binary()
187
  def terminate_message, do: @terminate_message
6✔
188

189
  @spec scram_request :: iodata()
190
  def scram_request, do: @scram_request
280✔
191

192
  @spec flush :: binary()
193
  def flush, do: @flush
1✔
194

195
  @spec sync :: binary()
196
  def sync, do: @sync
1✔
197

198
  @spec authentication_ok :: binary()
199
  def authentication_ok, do: @authentication_ok
275✔
200

201
  @spec ready_for_query :: binary()
202
  def ready_for_query, do: @ready_for_query
287✔
203

204
  @spec ssl_request :: binary()
205
  def ssl_request, do: @ssl_request
1✔
206

207
  # Internal functions
208

209
  @spec decode(binary(), list()) :: {:ok, [Pkt.t()], rest :: binary()}
210
  defp decode(data, acc) when byte_size(data) >= @pkt_header_size do
211
    case decode_pkt(data) do
2,592✔
212
      {:ok, pkt, rest} ->
213
        decode(rest, [pkt | acc])
2,592✔
214

215
      {:error, :incomplete} ->
NEW
216
        {:ok, Enum.reverse(acc), data}
×
217

218
      {:error, :bad_packet} ->
NEW
219
        raise "bad packet: #{inspect(data)}"
×
220
    end
221
  end
222

223
  defp decode(data, acc), do: {:ok, Enum.reverse(acc), data}
416✔
224

225
  @spec tag(char()) :: atom()
226
  defp tag(char) do
227
    case char do
3,215✔
228
      ?R -> :authentication
562✔
229
      ?K -> :backend_key_data
138✔
230
      ?2 -> :bind_complete
1✔
231
      ?3 -> :close_complete
1✔
232
      ?C -> :command_complete
3✔
233
      ?d -> :copy_data
1✔
234
      ?c -> :copy_done
1✔
235
      ?G -> :copy_in_response
1✔
236
      ?H -> :copy_out_response
1✔
237
      ?W -> :copy_both_response
1✔
238
      ?D -> :data_row
3✔
239
      ?I -> :empty_query_response
1✔
240
      ?E -> :error_response
4✔
241
      ?V -> :function_call_response
1✔
242
      ?n -> :no_data
1✔
243
      ?N -> :notice_response
1✔
244
      ?A -> :notification_response
1✔
245
      ?t -> :parameter_description
2✔
246
      ?S -> :parameter_status
1,775✔
247
      ?1 -> :parse_complete
1✔
248
      ?s -> :portal_suspended
1✔
249
      ?Z -> :ready_for_query
142✔
250
      ?T -> :row_description
7✔
251
      ?p -> :password_message
563✔
252
      _ -> :undefined
2✔
253
    end
254
  end
255

256
  @spec decode_payload(:authentication, binary()) ::
257
          atom() | {atom(), binary()} | {:undefined, any()}
258
  defp decode_payload(:authentication, payload) do
259
    case payload do
562✔
260
      <<0::integer-32>> ->
139✔
261
        :authentication_ok
262

263
      <<2::integer-32>> ->
1✔
264
        :authentication_kerberos_v5
265

266
      <<3::integer-32>> ->
2✔
267
        :authentication_cleartext_password
268

269
      <<5::integer-32, salt::binary-4>> ->
2✔
270
        {:authentication_md5_password, salt}
271

272
      <<6::integer-32>> ->
1✔
273
        :authentication_scm_credential
274

275
      <<7::integer-32>> ->
1✔
276
        :authentication_gss
277

278
      <<8::integer-32, rest::binary>> ->
1✔
279
        {:authentication_gss_continue, rest}
280

281
      <<9::integer-32>> ->
1✔
282
        :authentication_sspi
283

284
      <<10::integer-32, methods_b::binary>> ->
138✔
285
        {:authentication_sasl_password, methods_b}
286

287
      <<11::integer-32, server_first::binary>> ->
138✔
288
        {:authentication_server_first_message, server_first}
289

290
      <<12::integer-32, server_final_msg::binary>> ->
137✔
291
        {:authentication_server_final_message, server_final_msg}
292

293
      other ->
1✔
294
        {:undefined, other}
295
    end
296
  end
297

298
  @spec decode_payload(:parameter_status, binary()) :: {binary(), binary()} | :undefined
299
  defp decode_payload(:parameter_status, payload) do
300
    case String.split(payload, <<0>>, trim: true) do
1,775✔
301
      [k, v] -> {k, v}
1,770✔
302
      _ -> :undefined
5✔
303
    end
304
  end
305

306
  @spec decode_payload(:backend_key_data, binary()) :: %{pid: pos_integer(), key: binary()}
307
  defp decode_payload(:backend_key_data, <<pid::integer-32, key::integer-32>>),
308
    do: %{pid: pid, key: key}
138✔
309

310
  @spec decode_payload(:ready_for_query, binary()) :: :idle | :transaction | :error
311
  defp decode_payload(:ready_for_query, payload) do
312
    case payload do
142✔
313
      "I" -> :idle
140✔
314
      "T" -> :transaction
1✔
315
      "E" -> :error
1✔
316
    end
317
  end
318

319
  @spec decode_payload(:parse_complete, binary()) :: :parse_complete
320
  defp decode_payload(:parse_complete, ""), do: :parse_complete
1✔
321

322
  @spec decode_payload(:parameter_description, binary()) :: {pos_integer(), list()}
323
  defp decode_payload(:parameter_description, <<count::integer-16, rest::binary>>),
2✔
324
    do: {count, decode_parameter_description(rest, [])}
325

326
  @spec decode_payload(:row_description, binary()) :: list()
327
  defp decode_payload(:row_description, <<count::integer-16, rest::binary>>),
328
    do: decode_row_description(count, rest, [])
7✔
329

330
  @spec decode_payload(:data_row, binary()) :: nil
331
  defp decode_payload(:data_row, _payload), do: nil
3✔
332

333
  # https://www.postgresql.org/docs/current/protocol-error-fields.html
334
  @spec decode_payload(:error_response, binary()) :: [binary()]
335
  defp decode_payload(:error_response, payload),
336
    do: String.split(payload, <<0>>, trim: true)
4✔
337

338
  @spec decode_payload(:password_message, binary()) ::
339
          {:scram_sha_256, map()} | {:md5, binary()} | :undefined
340
  defp decode_payload(
341
         :password_message,
342
         <<"SCRAM-SHA-256", 0, _::32, channel::binary-3, bin::binary>>
343
       ) do
344
    case kv_to_map(bin) do
282✔
345
      {:ok, map} ->
346
        channel =
281✔
347
          case channel do
348
            "n,," -> "biws"
280✔
349
            "y,," -> "eSws"
1✔
350
          end
351

352
        {:scram_sha_256, Map.put(map, "c", channel)}
353

354
      {:error, _} ->
1✔
355
        :undefined
356
    end
357
  end
358

359
  defp decode_payload(:password_message, "md5" <> _ = bin) do
360
    case :binary.split(bin, <<0>>) do
2✔
361
      [digest, ""] -> {:md5, digest}
1✔
362
      _ -> :undefined
1✔
363
    end
364
  end
365

366
  @spec decode_payload(:password_message, binary()) ::
367
          {:first_msg_response, map()} | :undefined
368
  defp decode_payload(:password_message, bin) do
369
    case kv_to_map(bin) do
279✔
370
      {:ok, map} -> {:first_msg_response, map}
277✔
371
      {:error, _} -> :undefined
2✔
372
    end
373
  end
374

375
  defp decode_payload(_, _), do: :undefined
18✔
376

377
  @spec kv_to_map(binary()) :: {:ok, map()} | {:error, String.t()}
378
  defp kv_to_map(bin) do
379
    Regex.scan(~r/(\w+)=([^,]*)/, bin)
380
    |> Map.new(fn [_, k, v] -> {k, v} end)
1,394✔
381
    |> case do
561✔
382
      map when map_size(map) > 0 -> {:ok, map}
558✔
383
      _ -> {:error, "invalid key value string"}
3✔
384
    end
385
  end
386

387
  @spec decode_row_description(non_neg_integer(), binary(), list()) :: [map()] | {:error, :decode}
388
  defp decode_row_description(0, "", acc), do: Enum.reverse(acc)
5✔
389

390
  defp decode_row_description(count, rest, acc) do
391
    with {:ok, field_name,
6✔
392
          <<table_oid::integer-32, attr_num::integer-16, data_type_oid::integer-32,
393
            data_type_size::integer-16, type_modifier::integer-32, format_code::integer-16,
394
            tail::binary>>} <- decode_string(rest),
6✔
395
         {:ok, format} <- decode_format_code(format_code) do
5✔
396
      field = %{
4✔
397
        name: field_name,
398
        type_info: PgType.type(data_type_oid),
399
        table_oid: table_oid,
400
        attr_number: attr_num,
401
        data_type_oid: data_type_oid,
402
        data_type_size: data_type_size,
403
        type_modifier: type_modifier,
404
        format: format
405
      }
406

407
      decode_row_description(count - 1, tail, [field | acc])
4✔
408
    else
409
      _ -> {:error, :decode}
410
    end
411
  end
412

413
  @spec decode_format_code(0 | 1) :: {:ok, :text | :binary} | {:error, :unknown_format_code}
414
  defp decode_format_code(code) do
415
    case code do
5✔
416
      0 -> {:ok, :text}
3✔
417
      1 -> {:ok, :binary}
1✔
418
      _ -> {:error, :unknown_format_code}
1✔
419
    end
420
  end
421

422
  @spec decode_parameter_description(binary(), list()) :: [pos_integer()]
423
  defp decode_parameter_description("", acc), do: Enum.reverse(acc)
2✔
424

425
  defp decode_parameter_description(<<oid::integer-32, rest::binary>>, acc),
426
    do: decode_parameter_description(rest, [oid | acc])
2✔
427

428
  # The startup packet payload is a list of key/value pairs, separated by null bytes
429
  @spec decode_startup_packet_payload(binary()) :: {:ok, map()} | {:error, :bad_startup_payload}
430
  defp decode_startup_packet_payload(payload) do
431
    fields = String.split(payload, <<0>>, trim: true)
283✔
432

433
    # If the number of fields is odd, then the payload is malformed
434
    if rem(length(fields), 2) == 1 do
283✔
435
      {:error, :bad_startup_payload}
436
    else
437
      map =
283✔
438
        fields
439
        |> Enum.chunk_every(2)
440
        |> Enum.map(fn
441
          ["options" = k, v] -> {k, URI.decode_query(v)}
1✔
442
          [k, v] -> {k, v}
879✔
443
        end)
444
        |> Map.new()
445

446
      # We only do light validation on the fields in the payload. The only field we use at the
447
      # moment is `user`. If that's missing, this is a bad payload.
448
      if Map.has_key?(map, "user"),
283✔
449
        do: {:ok, map},
450
        else: {:error, :bad_startup_payload}
451
    end
452
  end
453
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc