• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 3259f7b813ea2fc01dbbef3b226b44d47067e413-PR-1593

27 Oct 2025 02:51AM UTC coverage: 86.46% (+0.2%) from 86.31%
3259f7b813ea2fc01dbbef3b226b44d47067e413-PR-1593

Pull #1593

github

edgurgel
fix: malformed PG changes subscriptions should not retry
Pull Request #1593: fix: malformed PG changes subscriptions should not retry

25 of 25 new or added lines in 4 files covered. (100.0%)

8 existing lines in 4 files now uncovered.

2203 of 2548 relevant lines covered (86.46%)

2591.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  use Realtime.Logs
6

7
  import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
8

9
  @type conn() :: Postgrex.conn()
10
  @type filter :: {binary, binary, binary}
11
  @type subscription_params :: {binary, binary, [filter]}
12
  @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
13

14
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
15

16
  @spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
17
          {:ok, Postgrex.Result.t()}
18
          | {:error, Exception.t() | :malformed_subscription_params | {:subscription_insert_failed, map()}}
19
  def create(conn, publication, subscription_list, manager, caller) do
20
    sql = "with sub_tables as (
28✔
21
        select
22
        rr.entity
23
        from
24
        pg_publication_tables pub,
25
        lateral (
26
        select
27
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
28
        ) rr
29
        where
30
        pub.pubname = $1
31
        and pub.schemaname like (case $2 when '*' then '%' else $2 end)
32
        and pub.tablename like (case $3 when '*' then '%' else $3 end)
33
     )
34
     insert into realtime.subscription as x(
35
        subscription_id,
36
        entity,
37
        filters,
38
        claims
39
      )
40
      select
41
        $4::text::uuid,
42
        sub_tables.entity,
43
        $6,
44
        $5
45
      from
46
        sub_tables
47
        on conflict
48
        (subscription_id, entity, filters)
49
        do update set
50
        claims = excluded.claims,
51
        created_at = now()
52
      returning
53
         id"
54

55
    transaction(conn, fn conn ->
28✔
56
      Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params = {schema, table, filters}} ->
28✔
57
        case query(conn, sql, [publication, schema, table, id, claims, filters]) do
48✔
58
          {:ok, %{num_rows: num} = result} when num > 0 ->
59
            send(manager, {:subscribed, {caller, id}})
42✔
60
            result
42✔
61

62
          {:ok, _} ->
63
            msg =
3✔
64
              "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
3✔
65

66
            rollback(conn, msg)
3✔
67

68
          {:error, exception} ->
69
            msg =
3✔
70
              "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
3✔
71

72
            rollback(conn, msg)
3✔
73
        end
74
      end)
75
    end)
76
  end
77

78
  defp params_to_log({schema, table, filters}) do
79
    %{schema: schema, table: table, filters: filters}
80
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
6✔
81
  end
82

83
  @spec delete(conn(), String.t()) :: any()
84
  def delete(conn, id) do
85
    Logger.debug("Delete subscription")
1✔
86
    sql = "delete from realtime.subscription where subscription_id = $1"
1✔
87
    # TODO: connection can be not available
88
    {:ok, _} = query(conn, sql, [id])
1✔
89
  end
90

91
  @spec delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
92
  def delete_all(conn) do
93
    Logger.debug("Delete all subscriptions")
22✔
94
    query(conn, "delete from realtime.subscription;", [])
22✔
95
  end
96

97
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
98
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
99
  def delete_multi(conn, ids) do
100
    Logger.debug("Delete multi ids subscriptions")
2✔
101
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
2✔
102
    query(conn, sql, [ids])
2✔
103
  end
104

105
  @spec maybe_delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
106
  def maybe_delete_all(conn) do
107
    query(
34✔
108
      conn,
109
      "do $$
110
        begin
111
          if exists (
112
            select 1
113
            from pg_tables
114
            where schemaname = 'realtime'
115
              and tablename  = 'subscription'
116
          )
117
          then
118
            delete from realtime.subscription;
119
          end if;
120
      end $$",
121
      []
122
    )
123
  end
124

125
  @spec fetch_publication_tables(conn(), String.t()) ::
126
          %{
127
            {<<_::1>>} => [integer()],
128
            {String.t()} => [integer()],
129
            {String.t(), String.t()} => [integer()]
130
          }
131
          | %{}
132
  def fetch_publication_tables(conn, publication) do
133
    sql = "select
53✔
134
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
135
    from pg_publication_tables where pubname = $1"
136

137
    case query(conn, sql, [publication]) do
53✔
138
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
139
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
140
          if String.contains?(table, " ") do
900✔
141
            log_error(
×
142
              "TableHasSpacesInName",
143
              "Table name cannot have spaces: \"#{schema}\".\"#{table}\""
×
144
            )
145
          end
146

147
          Map.put(acc, {schema, table}, [oid])
148
          |> Map.update({schema}, [oid], &[oid | &1])
600✔
149
          |> Map.update({"*"}, [oid], &[oid | &1])
900✔
150
        end)
151
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
53✔
152

153
      _ ->
154
        %{}
×
155
    end
156
  end
157

158
  @doc """
159
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
160

161
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
162

163
  ## Examples
164

165
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
166
      {:ok, {"public", "messages", [{"subject", "eq", "hey"}]}}
167

168
  `in` filter:
169

170
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
171
      {:ok, {"public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
172

173
  no filter:
174

175
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
176
      {:ok, {"public", "messages", []}}
177

178
  only schema:
179

180
      iex> parse_subscription_params(%{"schema" => "public"})
181
      {:ok, {"public", "*", []}}
182

183
  only table:
184

185
      iex> parse_subscription_params(%{"table" => "messages"})
186
      {:ok, {"public", "messages", []}}
187

188
  An unsupported filter will respond with an error tuple:
189

190
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
191
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
192

193
  Catch `undefined` filters:
194

195
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
196
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
197

198
  Catch `missing params`:
199

200
      iex> parse_subscription_params(%{})
201
      {:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
202

203
  """
204

205
  @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
206
  def parse_subscription_params(params) do
207
    case params do
32✔
208
      %{"schema" => schema, "table" => table, "filter" => filter} ->
209
        with [col, rest] <- String.split(filter, "=", parts: 2),
9✔
210
             [filter_type, value] when filter_type in @filter_types <-
7✔
211
               String.split(rest, ".", parts: 2),
212
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
6✔
213
          {:ok, {schema, table, [{col, filter_type, formatted_value}]}}
214
        else
215
          {:error, msg} ->
×
216
            {:error, "Error parsing `filter` params: #{msg}"}
×
217

218
          e ->
3✔
219
            {:error, "Error parsing `filter` params: #{inspect(e)}"}
220
        end
221

222
      %{"schema" => schema, "table" => table} ->
3✔
223
        {:ok, {schema, table, []}}
224

225
      %{"schema" => schema} ->
18✔
226
        {:ok, {schema, "*", []}}
227

228
      %{"table" => table} ->
1✔
229
        {:ok, {"public", table, []}}
230

UNCOV
231
      map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
×
232
        {:error,
233
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
234

235
      error ->
1✔
236
        {:error,
237
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
238
    end
239
  end
240

241
  defp format_filter_value(filter, value) do
242
    case filter do
6✔
243
      "in" ->
244
        case Regex.run(~r/^\((.*)\)$/, value) do
1✔
245
          nil ->
×
246
            {:error, "`in` filter value must be wrapped by parentheses"}
247

248
          [_, new_value] ->
1✔
249
            {:ok, "{#{new_value}}"}
1✔
250
        end
251

252
      _ ->
5✔
253
        {:ok, value}
254
    end
255
  end
256
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc