• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 60adbd884754a0b482822b93a160b72facfec740

02 Nov 2025 11:33PM UTC coverage: 87.053%. Remained the same
60adbd884754a0b482822b93a160b72facfec740

push

github

web-flow
fix: balance gen_rpc clients properly on GenRpcPubSub (#1603)

The worker as key doesn't have a significant cardinality to help
distributing across the multiple TCP connections gen_rpc uses

5 of 5 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

2266 of 2603 relevant lines covered (87.05%)

2655.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  use Realtime.Logs
6

7
  import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
8

9
  @type conn() :: Postgrex.conn()
10
  @type filter :: {binary, binary, binary}
11
  @type subscription_params :: {binary, binary, [filter]}
12
  @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
13

14
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
15

16
  @spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
17
          {:ok, Postgrex.Result.t()}
18
          | {:error, Exception.t() | {:exit, term} | {:subscription_insert_failed, String.t()}}
19

20
  def create(conn, publication, subscription_list, manager, caller) do
37✔
21
    sql = "with sub_tables as (
37✔
22
        select
23
        rr.entity
24
        from
25
        pg_publication_tables pub,
26
        lateral (
27
        select
28
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
29
        ) rr
30
        where
31
        pub.pubname = $1
32
        and pub.schemaname like (case $2 when '*' then '%' else $2 end)
33
        and pub.tablename like (case $3 when '*' then '%' else $3 end)
34
     )
35
     insert into realtime.subscription as x(
36
        subscription_id,
37
        entity,
38
        filters,
39
        claims
40
      )
41
      select
42
        $4::text::uuid,
43
        sub_tables.entity,
44
        $6,
45
        $5
46
      from
47
        sub_tables
48
        on conflict
49
        (subscription_id, entity, filters)
50
        do update set
51
        claims = excluded.claims,
52
        created_at = now()
53
      returning
54
         id"
55

56
    transaction(conn, fn conn ->
37✔
57
      Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params = {schema, table, filters}} ->
34✔
58
        case query(conn, sql, [publication, schema, table, id, claims, filters]) do
55✔
59
          {:ok, %{num_rows: num} = result} when num > 0 ->
60
            send(manager, {:subscribed, {caller, id}})
45✔
61
            result
45✔
62

63
          {:ok, _} ->
64
            msg =
6✔
65
              "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
6✔
66

67
            rollback(conn, {:subscription_insert_failed, msg})
6✔
68

69
          {:error, exception} ->
70
            msg =
4✔
71
              "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
4✔
72

73
            rollback(conn, {:subscription_insert_failed, msg})
4✔
74
        end
75
      end)
76
    end)
77
  rescue
78
    e in DBConnection.ConnectionError -> {:error, e}
1✔
79
  catch
80
    :exit, reason -> {:error, {:exit, reason}}
2✔
81
  end
82

83
  defp params_to_log({schema, table, filters}) do
84
    [schema: schema, table: table, filters: filters]
85
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
10✔
86
  end
87

88
  @spec delete(conn(), String.t()) :: any()
89
  def delete(conn, id) do
90
    Logger.debug("Delete subscription")
1✔
91
    sql = "delete from realtime.subscription where subscription_id = $1"
1✔
92
    # TODO: connection can be not available
93
    {:ok, _} = query(conn, sql, [id])
1✔
94
  end
95

96
  @spec delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
97
  def delete_all(conn) do
98
    Logger.debug("Delete all subscriptions")
24✔
99
    query(conn, "delete from realtime.subscription;", [])
24✔
100
  end
101

102
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
103
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
104
  def delete_multi(conn, ids) do
105
    Logger.debug("Delete multi ids subscriptions")
2✔
106
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
2✔
107
    query(conn, sql, [ids])
2✔
108
  end
109

110
  @spec maybe_delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
111
  def maybe_delete_all(conn) do
112
    query(
35✔
113
      conn,
114
      "do $$
115
        begin
116
          if exists (
117
            select 1
118
            from pg_tables
119
            where schemaname = 'realtime'
120
              and tablename  = 'subscription'
121
          )
122
          then
123
            delete from realtime.subscription;
124
          end if;
125
      end $$",
126
      []
127
    )
128
  end
129

130
  @spec fetch_publication_tables(conn(), String.t()) ::
131
          %{
132
            {<<_::1>>} => [integer()],
133
            {String.t()} => [integer()],
134
            {String.t(), String.t()} => [integer()]
135
          }
136
          | %{}
137
  def fetch_publication_tables(conn, publication) do
138
    sql = "select
61✔
139
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
140
    from pg_publication_tables where pubname = $1"
141

142
    case query(conn, sql, [publication]) do
61✔
143
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
144
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
145
          if String.contains?(table, " ") do
1,044✔
146
            log_error(
×
147
              "TableHasSpacesInName",
148
              "Table name cannot have spaces: \"#{schema}\".\"#{table}\""
×
149
            )
150
          end
151

152
          Map.put(acc, {schema, table}, [oid])
153
          |> Map.update({schema}, [oid], &[oid | &1])
696✔
154
          |> Map.update({"*"}, [oid], &[oid | &1])
1,044✔
155
        end)
156
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
61✔
157

158
      _ ->
159
        %{}
×
160
    end
161
  end
162

163
  @doc """
164
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
165

166
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
167

168
  ## Examples
169

170
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
171
      {:ok, {"public", "messages", [{"subject", "eq", "hey"}]}}
172

173
  `in` filter:
174

175
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
176
      {:ok, {"public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
177

178
  no filter:
179

180
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
181
      {:ok, {"public", "messages", []}}
182

183
  only schema:
184

185
      iex> parse_subscription_params(%{"schema" => "public"})
186
      {:ok, {"public", "*", []}}
187

188
  only table:
189

190
      iex> parse_subscription_params(%{"table" => "messages"})
191
      {:ok, {"public", "messages", []}}
192

193
  An unsupported filter will respond with an error tuple:
194

195
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
196
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
197

198
  Catch `undefined` filters:
199

200
      iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
201
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
202

203
  Catch `missing params`:
204

205
      iex> parse_subscription_params(%{})
206
      {:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
207

208
  """
209

210
  @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
211
  def parse_subscription_params(params) do
212
    case params do
47✔
213
      %{"schema" => schema, "table" => table, "filter" => filter} ->
214
        with [col, rest] <- String.split(filter, "=", parts: 2),
9✔
215
             [filter_type, value] when filter_type in @filter_types <-
8✔
216
               String.split(rest, ".", parts: 2),
217
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
7✔
218
          {:ok, {schema, table, [{col, filter_type, formatted_value}]}}
219
        else
220
          {:error, msg} ->
×
221
            {:error, "Error parsing `filter` params: #{msg}"}
×
222

223
          e ->
2✔
224
            {:error, "Error parsing `filter` params: #{inspect(e)}"}
225
        end
226

227
      %{"schema" => schema, "table" => table} ->
12✔
228
        {:ok, {schema, table, []}}
229

230
      %{"schema" => schema} ->
25✔
231
        {:ok, {schema, "*", []}}
232

233
      %{"table" => table} ->
1✔
234
        {:ok, {"public", table, []}}
235

236
      map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
×
237
        {:error,
238
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
239

UNCOV
240
      error ->
×
241
        {:error,
242
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
243
    end
244
  end
245

246
  defp format_filter_value(filter, value) do
247
    case filter do
7✔
248
      "in" ->
249
        case Regex.run(~r/^\((.*)\)$/, value) do
1✔
250
          nil ->
×
251
            {:error, "`in` filter value must be wrapped by parentheses"}
252

253
          [_, new_value] ->
1✔
254
            {:ok, "{#{new_value}}"}
1✔
255
        end
256

257
      _ ->
6✔
258
        {:ok, value}
259
    end
260
  end
261
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc