• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 25735355879

12 May 2026 12:46PM UTC coverage: 91.755% (-0.09%) from 91.844%
25735355879

Pull #1852

github

web-flow
Merge 0134591aa into 016eaf3dc
Pull Request #1852: feat: setup `supabase_realtime_admin`

3105 of 3384 relevant lines covered (91.76%)

4690.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.0
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  use Realtime.Logs
6

7
  import Postgrex, only: [transaction: 3, query: 3, rollback: 2]
8

9
  @type conn() :: Postgrex.conn()
10
  @type filter :: {binary, binary, binary}
11
  @type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]}
12
  @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
13

14
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
15

16
  @spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
17
          {:ok, Postgrex.Result.t()}
18
          | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
19

20
  def create(conn, publication, subscription_list, manager, caller) do
55✔
21
    opts = [timeout: 10_000]
55✔
22

23
    transaction(
55✔
24
      conn,
25
      fn conn ->
26
        Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params} ->
52✔
27
          case query(conn, publication, id, claims, params) do
75✔
28
            {:ok, %{num_rows: num} = result} when num > 0 ->
29
              send(manager, {:subscribed, {caller, id}})
64✔
30
              result
64✔
31

32
            {:ok, _} ->
33
              msg =
7✔
34
                "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
7✔
35

36
              rollback(conn, {:subscription_insert_failed, msg})
7✔
37

38
            {:error, exception} ->
39
              msg =
4✔
40
                "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
4✔
41

42
              rollback(conn, {:subscription_insert_failed, msg})
4✔
43
          end
44
        end)
45
      end,
46
      opts
47
    )
48
  rescue
49
    e in DBConnection.ConnectionError -> {:error, e}
1✔
50
  catch
51
    :exit, reason -> {:error, {:exit, reason}}
2✔
52
  end
53

54
  defp query(conn, publication, id, claims, subscription_params) do
55
    sql = "with sub_tables as (
75✔
56
        select
57
        rr.entity
58
        from
59
        pg_publication_tables pub,
60
        lateral (
61
        select
62
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
63
        ) rr
64
        where
65
        pub.pubname = $1
66
        and pub.schemaname like (case $2 when '*' then '%' else $2 end)
67
        and pub.tablename like (case $3 when '*' then '%' else $3 end)
68
     )
69
     insert into realtime.subscription as x(
70
        subscription_id,
71
        entity,
72
        filters,
73
        claims,
74
        action_filter
75
      )
76
      select
77
        $4::text::uuid,
78
        sub_tables.entity,
79
        $6,
80
        $5,
81
        $7
82
      from
83
        sub_tables
84
        on conflict
85
        (subscription_id, entity, filters, action_filter)
86
        do update set
87
        claims = excluded.claims,
88
        created_at = now()
89
      returning
90
         id"
91
    {action_filter, schema, table, filters} = subscription_params
75✔
92
    query(conn, sql, [publication, schema, table, id, claims, filters, action_filter])
75✔
93
  end
94

95
  defp params_to_log({action_filter, schema, table, filters}) do
96
    [event: action_filter, schema: schema, table: table, filters: filters]
97
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
11✔
98
  end
99

100
  @spec delete(conn(), String.t()) :: {:ok, Postgrex.Result.t()} | {:error, any()}
101
  def delete(conn, id) do
3✔
102
    Logger.debug("Delete subscription")
3✔
103
    sql = "delete from realtime.subscription where subscription_id = $1"
3✔
104

105
    case query(conn, sql, [id]) do
3✔
106
      {:error, reason} ->
107
        log_error("SubscriptionDeletionFailed", reason)
1✔
108
        {:error, reason}
109

110
      result ->
111
        result
1✔
112
    end
113
  catch
114
    :exit, reason ->
115
      log_error("SubscriptionDeletionFailed", {:exit, reason})
1✔
116
      {:error, {:exit, reason}}
117
  end
118

119
  @spec delete_all(conn()) :: :ok
120
  def delete_all(conn) do
41✔
121
    Logger.debug("Delete all subscriptions")
41✔
122

123
    case query(conn, "delete from realtime.subscription;", []) do
41✔
124
      {:ok, _} -> :ok
39✔
125
      {:error, reason} -> log_error("SubscriptionDeletionFailed", reason)
1✔
126
    end
127
  catch
128
    :exit, reason -> log_error("SubscriptionDeletionFailed", {:exit, reason})
1✔
129
  end
130

131
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
132
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
133
  def delete_multi(conn, ids) do
134
    Logger.debug("Delete multi ids subscriptions")
3✔
135
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
3✔
136
    query(conn, sql, [ids])
3✔
137
  end
138

139
  @spec delete_all_if_table_exists(conn()) :: :ok
140
  def delete_all_if_table_exists(conn) do
57✔
141
    case query(
57✔
142
           conn,
143
           "do $$
144
        begin
145
          if exists (
146
            select 1
147
            from pg_tables
148
            where schemaname = 'realtime'
149
              and tablename  = 'subscription'
150
          )
151
          then
152
            delete from realtime.subscription;
153
          end if;
154
      end $$",
155
           []
156
         ) do
157
      {:ok, _} -> :ok
48✔
158
      {:error, reason} -> log_error("SubscriptionCleanupFailed", reason)
1✔
159
    end
160
  catch
161
    :exit, reason -> log_error("SubscriptionCleanupFailed", {:exit, reason})
1✔
162
  end
163

164
  @spec fetch_publication_tables(conn(), String.t()) ::
165
          %{
166
            {<<_::1>>} => [integer()],
167
            {String.t()} => [integer()],
168
            {String.t(), String.t()} => [integer()]
169
          }
170
          | %{}
171
  def fetch_publication_tables(conn, publication) do
172
    sql = "select
98✔
173
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
174
    from pg_publication_tables where pubname = $1"
175

176
    case query(conn, sql, [publication]) do
98✔
177
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
178
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
179
          if String.contains?(table, " ") do
87✔
180
            log_error(
×
181
              "TableHasSpacesInName",
182
              "Table name cannot have spaces: \"#{schema}\".\"#{table}\""
×
183
            )
184
          end
185

186
          Map.put(acc, {schema, table}, [oid])
187
          |> Map.update({schema}, [oid], &[oid | &1])
×
188
          |> Map.update({"*"}, [oid], &[oid | &1])
87✔
189
        end)
190
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
98✔
191

192
      _ ->
193
        %{}
×
194
    end
195
  end
196

197
  @doc """
198
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
199

200
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
201

202
  ## Examples
203

204
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
205
      {:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}]}}
206

207
  `in` filter:
208

209
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
210
      {:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
211

212
  no filter:
213

214
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
215
      {:ok, {"*", "public", "messages", []}}
216

217
  only schema:
218

219
      iex> parse_subscription_params(%{"schema" => "public"})
220
      {:ok, {"*", "public", "*", []}}
221

222
  only table:
223

224
      iex> parse_subscription_params(%{"table" => "messages"})
225
      {:ok, {"*", "public", "messages", []}}
226

227
  An unsupported filter will respond with an error tuple:
228

229
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
230
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
231

232
  Catch `undefined` filters:
233

234
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
235
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
236

237
  Catch `missing params`:
238

239
      iex> parse_subscription_params(%{})
240
      {:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
241

242
  """
243

244
  @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
245
  def parse_subscription_params(params) do
246
    action_filter = action_filter(params)
68✔
247

248
    case params do
68✔
249
      %{"schema" => schema, "table" => table, "filter" => filter}
250
      when is_binary(schema) and is_binary(table) and is_binary(filter) ->
251
        with [col, rest] <- String.split(filter, "=", parts: 2),
12✔
252
             [filter_type, value] when filter_type in @filter_types <-
10✔
253
               String.split(rest, ".", parts: 2),
254
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
9✔
255
          {:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}}
256
        else
257
          {:error, msg} ->
×
258
            {:error, "Error parsing `filter` params: #{msg}"}
×
259

260
          e ->
3✔
261
            {:error, "Error parsing `filter` params: #{inspect(e)}"}
262
        end
263

264
      %{"schema" => schema, "table" => table}
265
      when is_binary(schema) and is_binary(table) and not is_map_key(params, "filter") ->
14✔
266
        {:ok, {action_filter, schema, table, []}}
267

268
      %{"schema" => schema}
269
      when is_binary(schema) and not is_map_key(params, "table") and not is_map_key(params, "filter") ->
37✔
270
        {:ok, {action_filter, schema, "*", []}}
271

272
      %{"table" => table}
273
      when is_binary(table) and not is_map_key(params, "schema") and not is_map_key(params, "filter") ->
1✔
274
        {:ok, {action_filter, "public", table, []}}
275

276
      map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
×
277
        {:error,
278
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
279

280
      error ->
4✔
281
        {:error,
282
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
283
    end
284
  end
285

286
  defp action_filter(%{"event" => "*"}), do: "*"
30✔
287

288
  defp action_filter(%{"event" => event}) when is_binary(event) do
289
    case String.upcase(event) do
16✔
290
      "INSERT" -> "INSERT"
10✔
291
      "UPDATE" -> "UPDATE"
3✔
292
      "DELETE" -> "DELETE"
3✔
293
      _ -> "*"
×
294
    end
295
  end
296

297
  defp action_filter(_), do: "*"
22✔
298

299
  defp format_filter_value(filter, value) do
300
    case filter do
9✔
301
      "in" ->
302
        case Regex.run(~r/^\((.*)\)$/, value) do
1✔
303
          nil ->
×
304
            {:error, "`in` filter value must be wrapped by parentheses"}
305

306
          [_, new_value] ->
1✔
307
            {:ok, "{#{new_value}}"}
1✔
308
        end
309

310
      _ ->
8✔
311
        {:ok, value}
312
    end
313
  end
314
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc