• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 22756298512

06 Mar 2026 08:55AM UTC coverage: 91.911% (+1.8%) from 90.126%
22756298512

push

github

web-flow
fix: improve coverage (#1742)

4 of 4 new or added lines in 2 files covered. (100.0%)

26 existing lines in 6 files now uncovered.

2943 of 3202 relevant lines covered (91.91%)

2920.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.19
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  use Realtime.Logs
6

7
  import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
8

9
  @type conn() :: Postgrex.conn()
10
  @type filter :: {binary, binary, binary}
11
  @type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]}
12
  @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
13

14
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
15

16
  @spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
17
          {:ok, Postgrex.Result.t()}
18
          | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
19

20
  def create(conn, publication, subscription_list, manager, caller) do
51✔
21
    transaction(conn, fn conn ->
51✔
22
      Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params} ->
48✔
23
        case query(conn, publication, id, claims, params) do
71✔
24
          {:ok, %{num_rows: num} = result} when num > 0 ->
25
            send(manager, {:subscribed, {caller, id}})
60✔
26
            result
60✔
27

28
          {:ok, _} ->
29
            msg =
7✔
30
              "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
7✔
31

32
            rollback(conn, {:subscription_insert_failed, msg})
7✔
33

34
          {:error, exception} ->
35
            msg =
4✔
36
              "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
4✔
37

38
            rollback(conn, {:subscription_insert_failed, msg})
4✔
39
        end
40
      end)
41
    end)
42
  rescue
43
    e in DBConnection.ConnectionError -> {:error, e}
1✔
44
  catch
45
    :exit, reason -> {:error, {:exit, reason}}
2✔
46
  end
47

48
  defp query(conn, publication, id, claims, subscription_params) do
49
    sql = "with sub_tables as (
71✔
50
        select
51
        rr.entity
52
        from
53
        pg_publication_tables pub,
54
        lateral (
55
        select
56
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
57
        ) rr
58
        where
59
        pub.pubname = $1
60
        and pub.schemaname like (case $2 when '*' then '%' else $2 end)
61
        and pub.tablename like (case $3 when '*' then '%' else $3 end)
62
     )
63
     insert into realtime.subscription as x(
64
        subscription_id,
65
        entity,
66
        filters,
67
        claims,
68
        action_filter
69
      )
70
      select
71
        $4::text::uuid,
72
        sub_tables.entity,
73
        $6,
74
        $5,
75
        $7
76
      from
77
        sub_tables
78
        on conflict
79
        (subscription_id, entity, filters, action_filter)
80
        do update set
81
        claims = excluded.claims,
82
        created_at = now()
83
      returning
84
         id"
85
    {action_filter, schema, table, filters} = subscription_params
71✔
86
    query(conn, sql, [publication, schema, table, id, claims, filters, action_filter])
71✔
87
  end
88

89
  defp params_to_log({action_filter, schema, table, filters}) do
90
    [event: action_filter, schema: schema, table: table, filters: filters]
91
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
11✔
92
  end
93

94
  @spec delete(conn(), String.t()) :: {:ok, Postgrex.Result.t()} | {:error, any()}
95
  def delete(conn, id) do
3✔
96
    Logger.debug("Delete subscription")
3✔
97
    sql = "delete from realtime.subscription where subscription_id = $1"
3✔
98

99
    case query(conn, sql, [id]) do
3✔
100
      {:error, reason} ->
101
        log_error("SubscriptionDeletionFailed", reason)
1✔
102
        {:error, reason}
103

104
      result ->
105
        result
1✔
106
    end
107
  catch
108
    :exit, reason ->
109
      log_error("SubscriptionDeletionFailed", {:exit, reason})
1✔
110
      {:error, {:exit, reason}}
111
  end
112

113
  @spec delete_all(conn()) :: :ok
114
  def delete_all(conn) do
39✔
115
    Logger.debug("Delete all subscriptions")
39✔
116

117
    case query(conn, "delete from realtime.subscription;", []) do
39✔
118
      {:ok, _} -> :ok
37✔
119
      {:error, reason} -> log_error("SubscriptionDeletionFailed", reason)
1✔
120
    end
121
  catch
122
    :exit, reason -> log_error("SubscriptionDeletionFailed", {:exit, reason})
1✔
123
  end
124

125
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
126
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
127
  def delete_multi(conn, ids) do
128
    Logger.debug("Delete multi ids subscriptions")
3✔
129
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
3✔
130
    query(conn, sql, [ids])
3✔
131
  end
132

133
  @spec delete_all_if_table_exists(conn()) :: :ok
134
  def delete_all_if_table_exists(conn) do
55✔
135
    case query(
55✔
136
           conn,
137
           "do $$
138
        begin
139
          if exists (
140
            select 1
141
            from pg_tables
142
            where schemaname = 'realtime'
143
              and tablename  = 'subscription'
144
          )
145
          then
146
            delete from realtime.subscription;
147
          end if;
148
      end $$",
149
           []
150
         ) do
151
      {:ok, _} -> :ok
47✔
152
      {:error, reason} -> log_error("SubscriptionCleanupFailed", reason)
1✔
153
    end
154
  catch
155
    :exit, reason -> log_error("SubscriptionCleanupFailed", {:exit, reason})
1✔
156
  end
157

158
  @spec fetch_publication_tables(conn(), String.t()) ::
159
          %{
160
            {<<_::1>>} => [integer()],
161
            {String.t()} => [integer()],
162
            {String.t(), String.t()} => [integer()]
163
          }
164
          | %{}
165
  def fetch_publication_tables(conn, publication) do
166
    sql = "select
96✔
167
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
168
    from pg_publication_tables where pubname = $1"
169

170
    case query(conn, sql, [publication]) do
96✔
171
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
172
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
173
          if String.contains?(table, " ") do
1,276✔
UNCOV
174
            log_error(
×
175
              "TableHasSpacesInName",
UNCOV
176
              "Table name cannot have spaces: \"#{schema}\".\"#{table}\""
×
177
            )
178
          end
179

180
          Map.put(acc, {schema, table}, [oid])
181
          |> Map.update({schema}, [oid], &[oid | &1])
912✔
182
          |> Map.update({"*"}, [oid], &[oid | &1])
1,276✔
183
        end)
184
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
96✔
185

186
      _ ->
UNCOV
187
        %{}
×
188
    end
189
  end
190

191
  @doc """
192
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
193

194
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
195

196
  ## Examples
197

198
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
199
      {:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}]}}
200

201
  `in` filter:
202

203
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
204
      {:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
205

206
  no filter:
207

208
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
209
      {:ok, {"*", "public", "messages", []}}
210

211
  only schema:
212

213
      iex> parse_subscription_params(%{"schema" => "public"})
214
      {:ok, {"*", "public", "*", []}}
215

216
  only table:
217

218
      iex> parse_subscription_params(%{"table" => "messages"})
219
      {:ok, {"*", "public", "messages", []}}
220

221
  An unsupported filter will respond with an error tuple:
222

223
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
224
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
225

226
  Catch `undefined` filters:
227

228
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
229
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
230

231
  Catch `missing params`:
232

233
      iex> parse_subscription_params(%{})
234
      {:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
235

236
  """
237

238
  @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
239
  def parse_subscription_params(params) do
240
    action_filter = action_filter(params)
64✔
241

242
    case params do
64✔
243
      %{"schema" => schema, "table" => table, "filter" => filter}
244
      when is_binary(schema) and is_binary(table) and is_binary(filter) ->
245
        with [col, rest] <- String.split(filter, "=", parts: 2),
10✔
246
             [filter_type, value] when filter_type in @filter_types <-
8✔
247
               String.split(rest, ".", parts: 2),
248
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
7✔
249
          {:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}}
250
        else
UNCOV
251
          {:error, msg} ->
×
UNCOV
252
            {:error, "Error parsing `filter` params: #{msg}"}
×
253

254
          e ->
3✔
255
            {:error, "Error parsing `filter` params: #{inspect(e)}"}
256
        end
257

258
      %{"schema" => schema, "table" => table}
259
      when is_binary(schema) and is_binary(table) and not is_map_key(params, "filter") ->
12✔
260
        {:ok, {action_filter, schema, table, []}}
261

262
      %{"schema" => schema}
263
      when is_binary(schema) and not is_map_key(params, "table") and not is_map_key(params, "filter") ->
37✔
264
        {:ok, {action_filter, schema, "*", []}}
265

266
      %{"table" => table}
267
      when is_binary(table) and not is_map_key(params, "schema") and not is_map_key(params, "filter") ->
1✔
268
        {:ok, {action_filter, "public", table, []}}
269

UNCOV
270
      map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
×
271
        {:error,
272
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
273

274
      error ->
4✔
275
        {:error,
276
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
277
    end
278
  end
279

280
  defp action_filter(%{"event" => "*"}), do: "*"
29✔
281

282
  defp action_filter(%{"event" => event}) when is_binary(event) do
283
    case String.upcase(event) do
15✔
284
      "INSERT" -> "INSERT"
10✔
285
      "UPDATE" -> "UPDATE"
2✔
286
      "DELETE" -> "DELETE"
3✔
UNCOV
287
      _ -> "*"
×
288
    end
289
  end
290

291
  defp action_filter(_), do: "*"
20✔
292

293
  defp format_filter_value(filter, value) do
294
    case filter do
7✔
295
      "in" ->
296
        case Regex.run(~r/^\((.*)\)$/, value) do
1✔
UNCOV
297
          nil ->
×
298
            {:error, "`in` filter value must be wrapped by parentheses"}
299

300
          [_, new_value] ->
1✔
301
            {:ok, "{#{new_value}}"}
1✔
302
        end
303

304
      _ ->
6✔
305
        {:ok, value}
306
    end
307
  end
308
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc