• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.14
/lib/supavisor/protocol/server.ex
1
defmodule Supavisor.Protocol.Server do
2
  @moduledoc """
3
  The Supavisor.Protocol.Server module is responsible for decoding data received from the PostgreSQL server. It provides several functions to decode payloads from different types of messages.
4

5
  Message Formats: https://www.postgresql.org/docs/current/protocol-message-formats.html
6
  Error codes https://www.postgresql.org/docs/current/errcodes-appendix.html
7
  """
8
  require Logger
9
  alias Supavisor.Protocol.{Debug, PgType}
10

11
  @pkt_header_size 5
12
  @authentication_ok <<?R, 8::32, 0::32>>
13
  @ready_for_query <<?Z, 5::32, ?I>>
14
  @ssl_request <<8::32, 1234::16, 5679::16>>
15
  @scram_request <<?R, 23::32, 10::32, "SCRAM-SHA-256", 0, 0>>
16
  @msg_cancel_header <<16::32, 1234::16, 5678::16>>
17
  @application_name <<?S, 31::32, "application_name", 0, "Supavisor", 0>>
18
  @terminate_message <<?X, 4::32>>
19
  @parse_complete_message <<?1, 4::32>>
20
  @bind_complete_message <<?2, 4::32>>
21
  @close_complete_message <<?3, 4::32>>
22
  @flush <<?H, 4::integer-32>>
23
  @sync <<?S, 4::integer-32>>
24

25
  defmodule Pkt do
26
    @moduledoc """
27
    Represents a packet structure with tag, length, and payload fields.
28

29
    The original binary can be found on the bin field.
30
    """
31

32
    defstruct([:tag, :len, :payload, :bin])
33

34
    @type t :: %Pkt{
35
            tag: atom,
36
            len: integer,
37
            payload: any,
38
            bin: binary
39
          }
40

41
    defimpl Inspect do
42
      def inspect(pkt, _opts) do
43
        case pkt.bin do
5,591✔
44
          bin when is_binary(bin) ->
45
            Debug.packet_to_string(bin, :backend)
5,591✔
46

47
          _ ->
×
48
            "#Supavisor.Protocol.Server.Pkt<malformed>"
49
        end
50
      end
51
    end
52
  end
53

54
  defmacro cancel_message(pid, key) do
55
    quote do
56
      <<unquote(@msg_cancel_header)::binary, unquote(pid)::32, unquote(key)::32>>
57
    end
58
  end
59

60
  defmacro ssl_request_message, do: @ssl_request
2✔
61
  defmacro parse_complete_message, do: @parse_complete_message
×
62
  defmacro bind_complete_message, do: @bind_complete_message
×
63
  defmacro close_complete_message, do: @close_complete_message
×
64

65
  defmacro parameter_description_message_shape do
66
    quote do
67
      <<?t, _rest::binary>>
68
    end
69
  end
70

71
  @spec decode(iodata()) :: {:ok, [Pkt.t()], rest :: binary()}
72
  def decode(data), do: decode(data, [])
898✔
73

74
  @spec decode_pkt(binary()) ::
75
          {:ok, Pkt.t(), binary()} | {:error, :bad_packet} | {:error, :incomplete}
76
  def decode_pkt(<<char::integer-8, pkt_len::integer-32, rest::binary>>) do
77
    payload_len = pkt_len - 4
6,878✔
78

79
    case rest do
6,878✔
80
      <<bin_payload::binary-size(payload_len), rest2::binary>> ->
81
        tag = tag(char)
6,877✔
82
        payload = decode_payload(tag, bin_payload)
6,877✔
83

84
        pkt = %Pkt{
6,877✔
85
          tag: tag,
86
          len: pkt_len + 1,
87
          payload: payload,
88
          bin: <<char, pkt_len::32, bin_payload::binary>>
89
        }
90

91
        {:ok, pkt, rest2}
6,877✔
92

93
      _ ->
1✔
94
        {:error, :incomplete}
95
    end
96
  end
97

98
  def decode_pkt(_), do: {:error, :bad_packet}
1✔
99

100
  @spec decode_string(binary()) :: {:ok, binary(), binary()} | {:error, :not_null_terminated}
101
  def decode_string(bin) do
102
    case :binary.match(bin, <<0>>) do
306✔
103
      :nomatch ->
3✔
104
        {:error, :not_null_terminated}
105

106
      {pos, 1} ->
107
        {string, <<0, rest::binary>>} = :erlang.split_binary(bin, pos)
303✔
108
        {:ok, string, rest}
303✔
109
    end
110
  end
111

112
  @spec md5_request(<<_::32>>) :: iodata()
113
  def md5_request(salt), do: [<<?R, 12::32, 5::32>>, salt]
1✔
114

115
  @spec exchange_first_message(binary, binary | boolean, pos_integer) :: binary
116
  def exchange_first_message(nonce, salt \\ false, iterations \\ 4096) do
117
    server_nonce =
612✔
118
      16
119
      |> :pgo_scram.get_nonce()
120
      |> Base.encode64()
121

122
    secret = if salt, do: salt, else: server_nonce
612✔
123
    "r=#{nonce <> server_nonce},s=#{secret},i=#{iterations}"
612✔
124
  end
125

126
  @spec exchange_message(:first | :final, binary()) :: iodata()
127
  def exchange_message(type, message) do
128
    code =
1,217✔
129
      case type do
130
        :first -> 11
612✔
131
        :final -> 12
605✔
132
      end
133

134
    [<<?R, byte_size(message) + 8::32, code::32>>, message]
135
  end
136

137
  @spec error_message(binary(), binary()) :: iodata()
138
  def error_message(code, value) do
139
    message = ["SFATAL", 0, "VFATAL", 0, "C", code, 0, "M", value, 0, 0]
15✔
140
    [<<?E, IO.iodata_length(message) + 4::32>>, message]
141
  end
142

143
  @spec encode_error_message(map()) :: iodata()
144
  def encode_error_message(error_map) when is_map(error_map) do
145
    sorted_fields = Enum.sort(error_map)
4✔
146
    message = [Enum.map(sorted_fields, fn {char, content} -> [char, content, <<0>>] end), <<0>>]
4✔
147
    [<<?E, IO.iodata_length(message) + 4::32>>, message]
148
  end
149

150
  @spec encode_parameter_status(map) :: iodata()
151
  def encode_parameter_status(ps) do
152
    for {key, value} <- ps do
33✔
153
      encode_pkt(:parameter_status, key, value)
154
    end
155
  end
156

157
  @spec encode_pkt(:parameter_status, binary, binary) :: iodata()
158
  def encode_pkt(:parameter_status, key, value) do
159
    payload = [key, <<0>>, value, <<0>>]
443✔
160
    len = IO.iodata_length(payload) + 4
443✔
161
    [<<?S, len::integer-32>>, payload]
162
  end
163

164
  @spec backend_key_data() :: {iodata(), binary}
165
  def backend_key_data do
166
    pid = System.unique_integer([:positive, :monotonic])
564✔
167
    key = :crypto.strong_rand_bytes(4)
564✔
168
    payload = <<pid::integer-32, key::binary>>
564✔
169
    len = IO.iodata_length(payload) + 4
564✔
170
    {<<?K, len::32>>, payload}
171
  end
172

173
  @spec decode_startup_packet(binary()) :: {:ok, map()} | {:error, :bad_startup_payload}
174
  def decode_startup_packet(<<len::integer-32, _protocol::binary-4, rest::binary>>) do
175
    with {:ok, payload} <- decode_startup_packet_payload(rest) do
618✔
176
      pkt = %{
617✔
177
        len: len,
178
        payload: payload,
179
        tag: :startup
180
      }
181

182
      {:ok, pkt}
183
    end
184
  end
185

186
  def decode_startup_packet(_), do: {:error, :bad_startup_payload}
7✔
187

188
  @spec application_name :: binary()
189
  def application_name, do: @application_name
1✔
190

191
  @spec terminate_message :: binary()
192
  def terminate_message, do: @terminate_message
243✔
193

194
  @spec scram_request :: iodata()
195
  def scram_request, do: @scram_request
614✔
196

197
  @spec flush :: binary()
198
  def flush, do: @flush
1✔
199

200
  @spec sync :: binary()
201
  def sync, do: @sync
1✔
202

203
  @spec authentication_ok :: binary()
204
  def authentication_ok, do: @authentication_ok
605✔
205

206
  @spec ready_for_query :: binary()
207
  def ready_for_query, do: @ready_for_query
21,233✔
208

209
  @spec ssl_request :: binary()
210
  def ssl_request, do: @ssl_request
2✔
211

212
  # Internal functions
213

214
  @spec decode(binary(), list()) :: {:ok, [Pkt.t()], rest :: binary()}
215
  defp decode(data, acc) when byte_size(data) >= @pkt_header_size do
216
    case decode_pkt(data) do
5,590✔
217
      {:ok, pkt, rest} ->
218
        decode(rest, [pkt | acc])
5,590✔
219

220
      {:error, :incomplete} ->
221
        {:ok, Enum.reverse(acc), data}
×
222

223
      {:error, :bad_packet} ->
224
        raise "bad packet: #{inspect(data)}"
×
225
    end
226
  end
227

228
  defp decode(data, acc), do: {:ok, Enum.reverse(acc), data}
898✔
229

230
  @spec tag(char()) :: atom()
231
  defp tag(char) do
232
    case char do
6,877✔
233
      ?R -> :authentication
1,202✔
234
      ?K -> :backend_key_data
295✔
235
      ?2 -> :bind_complete
1✔
236
      ?3 -> :close_complete
1✔
237
      ?C -> :command_complete
3✔
238
      ?d -> :copy_data
1✔
239
      ?c -> :copy_done
1✔
240
      ?G -> :copy_in_response
1✔
241
      ?H -> :copy_out_response
1✔
242
      ?W -> :copy_both_response
1✔
243
      ?D -> :data_row
3✔
244
      ?I -> :empty_query_response
1✔
245
      ?E -> :error_response
6✔
246
      ?V -> :function_call_response
1✔
247
      ?n -> :no_data
1✔
248
      ?N -> :notice_response
1✔
249
      ?A -> :notification_response
1✔
250
      ?t -> :parameter_description
2✔
251
      ?S -> :parameter_status
3,816✔
252
      ?1 -> :parse_complete
1✔
253
      ?s -> :portal_suspended
1✔
254
      ?Z -> :ready_for_query
300✔
255
      ?T -> :row_description
7✔
256
      ?p -> :password_message
1,226✔
257
      _ -> :undefined
3✔
258
    end
259
  end
260

261
  @spec decode_payload(:authentication, binary()) ::
262
          atom() | {atom(), binary()} | {:undefined, any()}
263
  defp decode_payload(:authentication, payload) do
264
    case payload do
1,202✔
265
      <<0::integer-32>> ->
298✔
266
        :authentication_ok
267

268
      <<2::integer-32>> ->
1✔
269
        :authentication_kerberos_v5
270

271
      <<3::integer-32>> ->
2✔
272
        :authentication_cleartext_password
273

274
      <<5::integer-32, salt::binary-4>> ->
3✔
275
        {:authentication_md5_password, salt}
276

277
      <<6::integer-32>> ->
1✔
278
        :authentication_scm_credential
279

280
      <<7::integer-32>> ->
1✔
281
        :authentication_gss
282

283
      <<8::integer-32, rest::binary>> ->
1✔
284
        {:authentication_gss_continue, rest}
285

286
      <<9::integer-32>> ->
1✔
287
        :authentication_sspi
288

289
      <<10::integer-32, methods_b::binary>> ->
299✔
290
        {:authentication_sasl_password, methods_b}
291

292
      <<11::integer-32, server_first::binary>> ->
298✔
293
        {:authentication_server_first_message, server_first}
294

295
      <<12::integer-32, server_final_msg::binary>> ->
296✔
296
        {:authentication_server_final_message, server_final_msg}
297

298
      other ->
1✔
299
        {:undefined, other}
300
    end
301
  end
302

303
  @spec decode_payload(:parameter_status, binary()) :: {binary(), binary()} | :undefined
304
  defp decode_payload(:parameter_status, payload) do
305
    case String.split(payload, <<0>>, trim: true) do
3,816✔
306
      [k, v] -> {k, v}
3,811✔
307
      _ -> :undefined
5✔
308
    end
309
  end
310

311
  @spec decode_payload(:backend_key_data, binary()) :: %{pid: pos_integer(), key: binary()}
312
  defp decode_payload(:backend_key_data, <<pid::integer-32, key::integer-32>>),
313
    do: %{pid: pid, key: key}
295✔
314

315
  @spec decode_payload(:ready_for_query, binary()) :: :idle | :transaction | :error
316
  defp decode_payload(:ready_for_query, payload) do
317
    case payload do
300✔
318
      "I" -> :idle
298✔
319
      "T" -> :transaction
1✔
320
      "E" -> :error
1✔
321
    end
322
  end
323

324
  @spec decode_payload(:parse_complete, binary()) :: :parse_complete
325
  defp decode_payload(:parse_complete, ""), do: :parse_complete
1✔
326

327
  @spec decode_payload(:parameter_description, binary()) :: {pos_integer(), list()}
328
  defp decode_payload(:parameter_description, <<count::integer-16, rest::binary>>),
2✔
329
    do: {count, decode_parameter_description(rest, [])}
330

331
  @spec decode_payload(:row_description, binary()) :: list()
332
  defp decode_payload(:row_description, <<count::integer-16, rest::binary>>),
333
    do: decode_row_description(count, rest, [])
7✔
334

335
  @spec decode_payload(:data_row, binary()) :: nil
336
  defp decode_payload(:data_row, _payload), do: nil
3✔
337

338
  # https://www.postgresql.org/docs/current/protocol-error-fields.html
339
  @spec decode_payload(:error_response, binary()) :: %{String.t() => String.t()}
340
  defp decode_payload(:error_response, payload) do
341
    fields = String.split(payload, <<0>>, trim: true)
6✔
342

343
    Enum.reduce(fields, %{}, fn field, acc ->
6✔
344
      case field do
33✔
345
        <<char::binary-1, content::binary>> -> Map.put(acc, char, content)
33✔
NEW
346
        _ -> acc
×
347
      end
348
    end)
349
  end
350

351
  @spec decode_payload(:password_message, binary()) ::
352
          {:scram_sha_256, map()} | {:md5, binary()} | :undefined
353
  defp decode_payload(
354
         :password_message,
355
         <<"SCRAM-SHA-256", 0, _::32, channel::binary-3, bin::binary>>
356
       ) do
357
    case kv_to_map(bin) do
614✔
358
      {:ok, map} ->
359
        channel =
613✔
360
          case channel do
361
            "n,," -> "biws"
612✔
362
            "y,," -> "eSws"
1✔
363
          end
364

365
        {:scram_sha_256, Map.put(map, "c", channel)}
366

367
      {:error, _} ->
1✔
368
        :undefined
369
    end
370
  end
371

372
  defp decode_payload(:password_message, "md5" <> _ = bin) do
373
    case :binary.split(bin, <<0>>) do
2✔
374
      [digest, ""] -> {:md5, digest}
1✔
375
      _ -> :undefined
1✔
376
    end
377
  end
378

379
  @spec decode_payload(:password_message, binary()) ::
380
          {:first_msg_response, map()} | :undefined
381
  defp decode_payload(:password_message, bin) do
382
    case kv_to_map(bin) do
610✔
383
      {:ok, map} -> {:first_msg_response, map}
608✔
384
      {:error, _} -> :undefined
2✔
385
    end
386
  end
387

388
  defp decode_payload(_, _), do: :undefined
19✔
389

390
  @spec kv_to_map(binary()) :: {:ok, map()} | {:error, String.t()}
391
  defp kv_to_map(bin) do
392
    Regex.scan(~r/(\w+)=([^,]*)/, bin)
393
    |> Map.new(fn [_, k, v] -> {k, v} end)
3,051✔
394
    |> case do
1,224✔
395
      map when map_size(map) > 0 -> {:ok, map}
1,221✔
396
      _ -> {:error, "invalid key value string"}
3✔
397
    end
398
  end
399

400
  @spec decode_row_description(non_neg_integer(), binary(), list()) :: [map()] | {:error, :decode}
401
  defp decode_row_description(0, "", acc), do: Enum.reverse(acc)
5✔
402

403
  defp decode_row_description(count, rest, acc) do
404
    with {:ok, field_name,
6✔
405
          <<table_oid::integer-32, attr_num::integer-16, data_type_oid::integer-32,
406
            data_type_size::integer-16, type_modifier::integer-32, format_code::integer-16,
407
            tail::binary>>} <- decode_string(rest),
6✔
408
         {:ok, format} <- decode_format_code(format_code) do
5✔
409
      field = %{
4✔
410
        name: field_name,
411
        type_info: PgType.type(data_type_oid),
412
        table_oid: table_oid,
413
        attr_number: attr_num,
414
        data_type_oid: data_type_oid,
415
        data_type_size: data_type_size,
416
        type_modifier: type_modifier,
417
        format: format
418
      }
419

420
      decode_row_description(count - 1, tail, [field | acc])
4✔
421
    else
422
      _ -> {:error, :decode}
423
    end
424
  end
425

426
  @spec decode_format_code(0 | 1) :: {:ok, :text | :binary} | {:error, :unknown_format_code}
427
  defp decode_format_code(code) do
428
    case code do
5✔
429
      0 -> {:ok, :text}
3✔
430
      1 -> {:ok, :binary}
1✔
431
      _ -> {:error, :unknown_format_code}
1✔
432
    end
433
  end
434

435
  @spec decode_parameter_description(binary(), list()) :: [pos_integer()]
436
  defp decode_parameter_description("", acc), do: Enum.reverse(acc)
2✔
437

438
  defp decode_parameter_description(<<oid::integer-32, rest::binary>>, acc),
439
    do: decode_parameter_description(rest, [oid | acc])
2✔
440

441
  # The startup packet payload is a list of key/value pairs, separated by null bytes
442
  @spec decode_startup_packet_payload(binary()) :: {:ok, map()} | {:error, :bad_startup_payload}
443
  defp decode_startup_packet_payload(payload) do
444
    fields = String.split(payload, <<0>>, trim: true)
618✔
445

446
    # If the number of fields is odd, then the payload is malformed
447
    if rem(length(fields), 2) == 1 do
618✔
448
      {:error, :bad_startup_payload}
449
    else
450
      map =
618✔
451
        fields
452
        |> Enum.chunk_every(2)
453
        |> Enum.map(fn
454
          ["options" = k, v] -> {k, URI.decode_query(v)}
1✔
455
          [k, v] -> {k, v}
2,150✔
456
        end)
457
        |> Map.new()
458

459
      # We only do light validation on the fields in the payload. The only field we use at the
460
      # moment is `user`. If that's missing, this is a bad payload.
461
      if Map.has_key?(map, "user") do
618✔
462
        {:ok, map}
463
      else
464
        Logger.error("Bad startup payload: #{inspect(payload, limit: 200)}")
1✔
465
        {:error, :bad_startup_payload}
466
      end
467
    end
468
  end
469
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc