• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 26643330875

29 May 2026 02:33PM UTC coverage: 91.458%. First build
26643330875

Pull #1925

github

web-flow
Merge 74752cbc2 into 5c93595e4
Pull Request #1925: fix: prevent usage of select with wildcard; handle edge cases

22 of 25 new or added lines in 1 file covered. (88.0%)

3255 of 3559 relevant lines covered (91.46%)

4687.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.17
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  use Realtime.Logs
6

7
  import Postgrex, only: [transaction: 3, query: 3, rollback: 2]
8

9
  @type conn() :: Postgrex.conn()
10
  @type filter :: {binary, binary, binary}
11
  @type subscription_params ::
12
          {action_filter :: binary, schema :: binary, table :: binary, [filter], selected_columns :: [binary] | nil}
13
  @type subscription_list :: [
14
          %{id: binary, claims: map, subscription_params: subscription_params}
15
        ]
16

17
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
18

19
  @spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
20
          {:ok, Postgrex.Result.t()}
21
          | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
22

23
  def create(conn, publication, subscription_list, manager, caller) do
87✔
24
    opts = [timeout: 10_000]
87✔
25

26
    transaction(
87✔
27
      conn,
28
      fn conn ->
29
        Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params} ->
84✔
30
          case query(conn, publication, id, claims, params) do
108✔
31
            {:ok, %{num_rows: num} = result} when num > 0 ->
32
              send(manager, {:subscribed, {caller, id}})
95✔
33
              result
95✔
34

35
            {:ok, _} ->
36
              msg =
8✔
37
                "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
8✔
38

39
              rollback(conn, {:subscription_insert_failed, msg})
8✔
40

41
            {:error, exception} ->
42
              msg =
5✔
43
                "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
5✔
44

45
              rollback(conn, {:subscription_insert_failed, msg})
5✔
46
          end
47
        end)
48
      end,
49
      opts
50
    )
51
  rescue
52
    e in DBConnection.ConnectionError -> {:error, e}
1✔
53
  catch
54
    :exit, reason -> {:error, {:exit, reason}}
2✔
55
  end
56

57
  defp query(conn, publication, id, claims, subscription_params) do
58
    sql = "with sub_tables as (
108✔
59
        select
60
        rr.entity
61
        from
62
        pg_publication_tables pub,
63
        lateral (
64
        select
65
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
66
        ) rr
67
        where
68
        pub.pubname = $1
69
        and pub.schemaname like (case $2 when '*' then '%' else $2 end) escape ''
70
        and pub.tablename like (case $3 when '*' then '%' else $3 end) escape ''
71
     )
72
     insert into realtime.subscription as x(
73
        subscription_id,
74
        entity,
75
        filters,
76
        claims,
77
        action_filter,
78
        selected_columns
79
      )
80
      select
81
        $4::text::uuid,
82
        sub_tables.entity,
83
        $6,
84
        $5,
85
        $7,
86
        $8
87
      from
88
        sub_tables
89
        on conflict
90
        -- coalesce needed: NULL != NULL in unique constraints; NULL selected_columns means all columns
91
        (subscription_id, entity, filters, action_filter, coalesce(selected_columns, '{}'))
92
        do update set
93
        claims = excluded.claims,
94
        created_at = now()
95
      returning
96
         id"
97
    {action_filter, schema, table, filters, selected_columns} = subscription_params
108✔
98
    query(conn, sql, [publication, schema, table, id, claims, filters, action_filter, selected_columns])
108✔
99
  end
100

101
  defp params_to_log({action_filter, schema, table, filters, selected_columns}) do
102
    [event: action_filter, schema: schema, table: table, filters: filters, select: selected_columns]
103
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
13✔
104
  end
105

106
  @spec delete(conn(), String.t()) :: {:ok, Postgrex.Result.t()} | {:error, any()}
107
  def delete(conn, id) do
3✔
108
    Logger.debug("Delete subscription")
3✔
109
    sql = "delete from realtime.subscription where subscription_id = $1"
3✔
110

111
    case query(conn, sql, [id]) do
3✔
112
      {:error, reason} ->
113
        log_error("SubscriptionDeletionFailed", reason)
1✔
114
        {:error, reason}
115

116
      result ->
117
        result
1✔
118
    end
119
  catch
120
    :exit, reason ->
121
      log_error("SubscriptionDeletionFailed", {:exit, reason})
1✔
122
      {:error, {:exit, reason}}
123
  end
124

125
  @spec delete_all(conn()) :: :ok
126
  def delete_all(conn) do
71✔
127
    Logger.debug("Delete all subscriptions")
71✔
128

129
    case query(conn, "delete from realtime.subscription;", []) do
71✔
130
      {:ok, _} -> :ok
69✔
131
      {:error, reason} -> log_error("SubscriptionDeletionFailed", reason)
1✔
132
    end
133
  catch
134
    :exit, reason -> log_error("SubscriptionDeletionFailed", {:exit, reason})
1✔
135
  end
136

137
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
138
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
139
  def delete_multi(conn, ids) do
140
    Logger.debug("Delete multi ids subscriptions")
3✔
141
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
3✔
142
    query(conn, sql, [ids])
3✔
143
  end
144

145
  @spec delete_all_if_table_exists(conn()) :: :ok
146
  def delete_all_if_table_exists(conn) do
74✔
147
    case query(
74✔
148
           conn,
149
           "do $$
150
        begin
151
          if exists (
152
            select 1
153
            from pg_tables
154
            where schemaname = 'realtime'
155
              and tablename  = 'subscription'
156
          )
157
          then
158
            delete from realtime.subscription;
159
          end if;
160
      end $$",
161
           []
162
         ) do
163
      {:ok, _} -> :ok
64✔
164
      {:error, reason} -> log_error("SubscriptionCleanupFailed", reason)
1✔
165
    end
166
  catch
167
    :exit, reason -> log_error("SubscriptionCleanupFailed", {:exit, reason})
1✔
168
  end
169

170
  @spec fetch_publication_tables(conn(), String.t()) ::
171
          %{
172
            {<<_::1>>} => [integer()],
173
            {String.t()} => [integer()],
174
            {String.t(), String.t()} => [integer()]
175
          }
176
          | %{}
177
  def fetch_publication_tables(conn, publication) do
178
    sql = "select
130✔
179
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
180
    from pg_publication_tables where pubname = $1"
181

182
    case query(conn, sql, [publication]) do
130✔
183
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
184
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
185
          Map.put(acc, {schema, table}, [oid])
186
          |> Map.update({schema}, [oid], &[oid | &1])
1,212✔
187
          |> Map.update({"*"}, [oid], &[oid | &1])
1,696✔
188
        end)
189
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
130✔
190

191
      _ ->
192
        %{}
×
193
    end
194
  end
195

196
  @doc """
197
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
198

199
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
200

201
  Multiple filters can be combined with commas and are applied as AND conditions:
202
  `"col1=eq.val,col2=gt.5"` means `col1 = val AND col2 > 5`.
203

204
  ## Examples
205

206
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
207
      {:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}], nil}}
208

209
  `in` filter:
210

211
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
212
      {:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}], nil}}
213

214
  AND composition — multiple filters separated by commas:
215

216
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "id=gt.0,id=lt.100"})
217
      {:ok, {"*", "public", "messages", [{"id", "gt", "0"}, {"id", "lt", "100"}], nil}}
218

219
  empty or whitespace-only filter string is treated as no filter:
220

221
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => ""})
222
      {:ok, {"*", "public", "messages", [], nil}}
223

224
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "   "})
225
      {:ok, {"*", "public", "messages", [], nil}}
226

227
  no filter:
228

229
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
230
      {:ok, {"*", "public", "messages", [], nil}}
231

232
  only schema:
233

234
      iex> parse_subscription_params(%{"schema" => "public"})
235
      {:ok, {"*", "public", "*", [], nil}}
236

237
  only table:
238

239
      iex> parse_subscription_params(%{"table" => "messages"})
240
      {:ok, {"*", "public", "messages", [], nil}}
241

242
  An unsupported filter will respond with an error tuple:
243

244
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
245
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
246

247
  Catch `undefined` filters:
248

249
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
250
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
251

252
  Catch `missing params`:
253

254
      iex> parse_subscription_params(%{})
255
      {:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
256

257
  """
258

259
  @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
260
  def parse_subscription_params(params) do
261
    action_filter = action_filter(params)
126✔
262

263
    with {:ok, selected_columns} <- parse_select(params) do
126✔
264
      case params do
124✔
265
        %{"schema" => schema, "table" => table, "filter" => filter}
266
        when is_binary(schema) and is_binary(table) and is_binary(filter) ->
267
          case parse_filters(filter) do
34✔
268
            {:ok, filters} ->
269
              case reject_select_on_wildcard(schema, table, selected_columns) do
27✔
270
                :ok -> {:ok, {action_filter, schema, table, filters, selected_columns}}
27✔
NEW
271
                error -> error
×
272
              end
273

274
            {:error, reason} ->
7✔
275
              {:error, "Error parsing `filter` params: #{reason}"}
7✔
276
          end
277

278
        %{"schema" => schema, "table" => table}
279
        when is_binary(schema) and is_binary(table) and not is_map_key(params, "filter") ->
280
          case reject_select_on_wildcard(schema, table, selected_columns) do
46✔
281
            :ok -> {:ok, {action_filter, schema, table, [], selected_columns}}
45✔
282
            error -> error
1✔
283
          end
284

285
        %{"schema" => schema}
286
        when is_binary(schema) and not is_map_key(params, "table") and
287
               not is_map_key(params, "filter") ->
288
          case reject_select_on_wildcard(schema, "*", selected_columns) do
39✔
289
            :ok -> {:ok, {action_filter, schema, "*", [], selected_columns}}
38✔
290
            error -> error
1✔
291
          end
292

293
        %{"table" => table}
294
        when is_binary(table) and not is_map_key(params, "schema") and
295
               not is_map_key(params, "filter") ->
296
          case reject_select_on_wildcard("public", table, selected_columns) do
1✔
297
            :ok -> {:ok, {action_filter, "public", table, [], selected_columns}}
1✔
NEW
298
            error -> error
×
299
          end
300

NEW
301
        map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
×
302
          {:error,
303
           "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
304

305
        error ->
4✔
306
          {:error,
307
           "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
308
      end
309
    end
310
  end
311

312
  defp parse_select(%{"select" => cols}) when is_list(cols) do
313
    case Enum.filter(cols, &is_binary/1) do
20✔
314
      [] -> {:ok, nil}
1✔
315
      valid -> {:ok, valid}
19✔
316
    end
317
  end
318

319
  defp parse_select(%{"select" => str}) when is_binary(str) do
2✔
320
    {:error, "Error parsing `select` params: expected a list of column name strings, e.g. select: [\"col1\", \"col2\"]"}
321
  end
322

323
  defp parse_select(_), do: {:ok, nil}
104✔
324

325
  defp reject_select_on_wildcard(_schema, _table, nil), do: :ok
94✔
326

327
  defp reject_select_on_wildcard(schema, table, _selected_columns)
2✔
328
       when schema == "*" or table == "*" do
329
    {:error, "Column selection is not supported for wildcard subscriptions. Provide an explicit schema and table name."}
330
  end
331

332
  defp reject_select_on_wildcard(_schema, _table, _selected_columns), do: :ok
17✔
333

334
  defp action_filter(%{"event" => "*"}), do: "*"
30✔
335

336
  defp action_filter(%{"event" => event}) when is_binary(event) do
337
    case String.upcase(event) do
34✔
338
      "INSERT" -> "INSERT"
24✔
339
      "UPDATE" -> "UPDATE"
5✔
340
      "DELETE" -> "DELETE"
5✔
341
      _ -> "*"
×
342
    end
343
  end
344

345
  defp action_filter(_), do: "*"
62✔
346

347
  defp parse_filters(filter) when is_binary(filter) do
348
    case String.trim(filter) do
34✔
349
      "" -> {:ok, []}
4✔
350
      trimmed -> scan(trimmed, trimmed, 0, 0, 0, [])
30✔
351
    end
352
  end
353

354
  # Reached end of binary — parse the final segment
355
  defp scan(<<>>, orig, start, len, _depth, acc) do
356
    case parse_segment(binary_part(orig, start, len)) do
28✔
357
      {:ok, parsed} -> {:ok, Enum.reverse([parsed | acc])}
23✔
358
      {:error, _} = e -> e
5✔
359
    end
360
  end
361

362
  defp scan(<<"(", rest::binary>>, orig, start, len, depth, acc) do
363
    scan(rest, orig, start, len + 1, depth + 1, acc)
6✔
364
  end
365

366
  defp scan(<<")", rest::binary>>, orig, start, len, depth, acc) do
367
    scan(rest, orig, start, len + 1, max(0, depth - 1), acc)
8✔
368
  end
369

370
  # Comma at depth 0 — segment boundary
371
  defp scan(<<",", rest::binary>>, orig, start, len, 0, acc) do
372
    case parse_segment(binary_part(orig, start, len)) do
18✔
373
      {:ok, parsed} -> scan(rest, orig, start + len + 1, 0, 0, [parsed | acc])
16✔
374
      {:error, _} = e -> e
2✔
375
    end
376
  end
377

378
  defp scan(<<_::8, rest::binary>>, orig, start, len, depth, acc) do
379
    scan(rest, orig, start, len + 1, depth, acc)
489✔
380
  end
381

382
  defp parse_segment(segment) do
383
    case String.trim(segment) do
46✔
384
      "" ->
3✔
385
        {:error, "filter must not contain empty segments (check for extra commas)"}
386

387
      trimmed ->
388
        with [col, rest] <- String.split(trimmed, "=", parts: 2),
43✔
389
             [filter_type, value] when filter_type in @filter_types <-
41✔
390
               String.split(rest, ".", parts: 2),
391
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
39✔
392
          {:ok, {col, filter_type, formatted_value}}
393
        else
394
          {:error, msg} -> {:error, msg}
×
395
          e -> {:error, inspect(e)}
4✔
396
        end
397
    end
398
  end
399

400
  defp format_filter_value("in", value) do
401
    size = byte_size(value)
6✔
402

403
    if size >= 2 and binary_part(value, 0, 1) == "(" and binary_part(value, size - 1, 1) == ")" do
6✔
404
      {:ok, "{" <> binary_part(value, 1, size - 2) <> "}"}
405
    else
406
      {:error, "`in` filter value must be wrapped by parentheses"}
407
    end
408
  end
409

410
  defp format_filter_value(_filter, value), do: {:ok, value}
33✔
411
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc