• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / e4000c5941bf3292f97209a81dfd0c7859cedca9-PR-1624

21 Nov 2025 06:31AM UTC coverage: 87.693% (-0.6%) from 88.285%
e4000c5941bf3292f97209a81dfd0c7859cedca9-PR-1624

Pull #1624

github

edgurgel
WIP
Pull Request #1624: WIP: filter postgres changes by action

9 of 10 new or added lines in 1 file covered. (90.0%)

17 existing lines in 5 files now uncovered.

2380 of 2714 relevant lines covered (87.69%)

4364.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  use Realtime.Logs
6

7
  import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
8

9
  @type conn() :: Postgrex.conn()
10
  @type filter :: {binary, binary, binary}
11
  @type subscription_params :: {binary, binary, binary, [filter]}
12
  @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
13

14
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
15

16
  @spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
17
          {:ok, Postgrex.Result.t()}
18
          | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
19

20
  def create(conn, publication, subscription_list, manager, caller) do
46✔
21
    sql = "with sub_tables as (
46✔
22
        select
23
        rr.entity
24
        from
25
        pg_publication_tables pub,
26
        lateral (
27
        select
28
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
29
        ) rr
30
        where
31
        pub.pubname = $1
32
        and pub.schemaname like (case $2 when '*' then '%' else $2 end)
33
        and pub.tablename like (case $3 when '*' then '%' else $3 end)
34
     )
35
     insert into realtime.subscription as x(
36
        subscription_id,
37
        entity,
38
        filters,
39
        claims,
40
        action_filter
41
      )
42
      select
43
        $4::text::uuid,
44
        sub_tables.entity,
45
        $6,
46
        $5,
47
        $7
48
      from
49
        sub_tables
50
        on conflict
51
        (subscription_id, entity, filters, action_filter)
52
        do update set
53
        claims = excluded.claims,
54
        created_at = now()
55
      returning
56
         id"
57

58
    transaction(conn, fn conn ->
46✔
59
      Enum.map(subscription_list, fn %{
43✔
60
                                       id: id,
61
                                       claims: claims,
62
                                       subscription_params: params = {action_filter, schema, table, filters}
63
                                     } ->
64
        case query(conn, sql, [publication, schema, table, id, claims, filters, action_filter]) do
60✔
65
          {:ok, %{num_rows: num} = result} when num > 0 ->
66
            send(manager, {:subscribed, {caller, id}})
48✔
67
            result
48✔
68

69
          {:ok, _} ->
70
            msg =
8✔
71
              "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
8✔
72

73
            rollback(conn, {:subscription_insert_failed, msg})
8✔
74

75
          {:error, exception} ->
76
            msg =
4✔
77
              "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
4✔
78

79
            rollback(conn, {:subscription_insert_failed, msg})
4✔
80
        end
81
      end)
82
    end)
83
  rescue
84
    e in DBConnection.ConnectionError -> {:error, e}
1✔
85
  catch
86
    :exit, reason -> {:error, {:exit, reason}}
2✔
87
  end
88

89
  defp params_to_log({action_filter, schema, table, filters}) do
90
    [event: action_filter, schema: schema, table: table, filters: filters]
91
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
12✔
92
  end
93

94
  @spec delete(conn(), String.t()) :: any()
95
  def delete(conn, id) do
96
    Logger.debug("Delete subscription")
1✔
97
    sql = "delete from realtime.subscription where subscription_id = $1"
1✔
98
    # TODO: connection can be not available
99
    {:ok, _} = query(conn, sql, [id])
1✔
100
  end
101

102
  @spec delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
103
  def delete_all(conn) do
104
    Logger.debug("Delete all subscriptions")
27✔
105
    query(conn, "delete from realtime.subscription;", [])
27✔
106
  end
107

108
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
109
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
110
  def delete_multi(conn, ids) do
111
    Logger.debug("Delete multi ids subscriptions")
1✔
112
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
1✔
113
    query(conn, sql, [ids])
1✔
114
  end
115

116
  @spec maybe_delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
117
  def maybe_delete_all(conn) do
118
    query(
44✔
119
      conn,
120
      "do $$
121
        begin
122
          if exists (
123
            select 1
124
            from pg_tables
125
            where schemaname = 'realtime'
126
              and tablename  = 'subscription'
127
          )
128
          then
129
            delete from realtime.subscription;
130
          end if;
131
      end $$",
132
      []
133
    )
134
  end
135

136
  @spec fetch_publication_tables(conn(), String.t()) ::
137
          %{
138
            {<<_::1>>} => [integer()],
139
            {String.t()} => [integer()],
140
            {String.t(), String.t()} => [integer()]
141
          }
142
          | %{}
143
  def fetch_publication_tables(conn, publication) do
144
    sql = "select
75✔
145
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
146
    from pg_publication_tables where pubname = $1"
147

148
    case query(conn, sql, [publication]) do
75✔
149
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
150
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
151
          if String.contains?(table, " ") do
1,260✔
152
            log_error(
×
153
              "TableHasSpacesInName",
154
              "Table name cannot have spaces: \"#{schema}\".\"#{table}\""
×
155
            )
156
          end
157

158
          Map.put(acc, {schema, table}, [oid])
159
          |> Map.update({schema}, [oid], &[oid | &1])
840✔
160
          |> Map.update({"*"}, [oid], &[oid | &1])
1,260✔
161
        end)
162
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
75✔
163

164
      _ ->
165
        %{}
×
166
    end
167
  end
168

169
  @doc """
170
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
171

172
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
173

174
  ## Examples
175

176
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
177
      {:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}]}}
178

179
  `in` filter:
180

181
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
182
      {:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
183

184
  no filter:
185

186
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
187
      {:ok, {"*", "public", "messages", []}}
188

189
  only schema:
190

191
      iex> parse_subscription_params(%{"schema" => "public"})
192
      {:ok, {"*", "public", "*", []}}
193

194
  only table:
195

196
      iex> parse_subscription_params(%{"table" => "messages"})
197
      {:ok, {"*", "public", "messages", []}}
198

199
  An unsupported filter will respond with an error tuple:
200

201
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
202
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
203

204
  Catch `undefined` filters:
205

206
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
207
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
208

209
  Catch `missing params`:
210

211
      iex> parse_subscription_params(%{})
212
      {:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
213

214
  """
215

216
  @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
217
  def parse_subscription_params(params) do
218
    action_filter = action_filter(params)
59✔
219

220
    case params do
59✔
221
      %{"schema" => schema, "table" => table, "filter" => filter} ->
222
        with [col, rest] <- String.split(filter, "=", parts: 2),
10✔
223
             [filter_type, value] when filter_type in @filter_types <-
8✔
224
               String.split(rest, ".", parts: 2),
225
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
7✔
226
          {:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}}
227
        else
228
          {:error, msg} ->
×
229
            {:error, "Error parsing `filter` params: #{msg}"}
×
230

231
          e ->
3✔
232
            {:error, "Error parsing `filter` params: #{inspect(e)}"}
233
        end
234

235
      %{"schema" => schema, "table" => table} ->
12✔
236
        {:ok, {action_filter, schema, table, []}}
237

238
      %{"schema" => schema} ->
35✔
239
        {:ok, {action_filter, schema, "*", []}}
240

241
      %{"table" => table} ->
1✔
242
        {:ok, {action_filter, "public", table, []}}
243

244
      map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
×
245
        {:error,
246
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
247

248
      error ->
1✔
249
        {:error,
250
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
251
    end
252
  end
253

254
  defp action_filter(%{"event" => "*"}), do: "*"
28✔
255

256
  defp action_filter(%{"event" => event}) when is_binary(event) do
257
    case String.upcase(event) do
13✔
258
      "INSERT" -> "INSERT"
8✔
259
      "UPDATE" -> "UPDATE"
2✔
260
      "DELETE" -> "DELETE"
3✔
NEW
261
      _ -> "*"
×
262
    end
263
  end
264

265
  defp action_filter(_), do: "*"
18✔
266

267
  defp format_filter_value(filter, value) do
268
    case filter do
7✔
269
      "in" ->
270
        case Regex.run(~r/^\((.*)\)$/, value) do
1✔
271
          nil ->
×
272
            {:error, "`in` filter value must be wrapped by parentheses"}
273

274
          [_, new_value] ->
1✔
275
            {:ok, "{#{new_value}}"}
1✔
276
        end
277

278
      _ ->
6✔
279
        {:ok, value}
280
    end
281
  end
282
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc