• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / e1562ca22b57bd055f9114a64d1b08a27868ea36

22 May 2025 01:15AM UTC coverage: 83.161% (+0.8%) from 82.314%
e1562ca22b57bd055f9114a64d1b08a27868ea36

push

github

web-flow
chore: update README with DatabaseLackOfConnections error codes (#1381)

1768 of 2126 relevant lines covered (83.16%)

2037.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.25
/lib/extensions/postgres_cdc_rls/subscriptions.ex
1
defmodule Extensions.PostgresCdcRls.Subscriptions do
2
  @moduledoc """
3
  This module consolidates subscriptions handling
4
  """
5
  require Logger
6
  import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
7
  import Realtime.Logs
8

9
  @type conn() :: Postgrex.conn()
10

11
  @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
12

13
  @spec create(conn(), String.t(), [map()], pid(), pid()) ::
14
          {:ok, Postgrex.Result.t()}
15
          | {:error, Exception.t() | :malformed_subscription_params | {:subscription_insert_failed, map()}}
16
  def create(conn, publication, params_list, manager, caller) do
17
    sql = "with sub_tables as (
15✔
18
        select
19
        rr.entity
20
        from
21
        pg_publication_tables pub,
22
        lateral (
23
        select
24
        format('%I.%I', pub.schemaname, pub.tablename)::regclass entity
25
        ) rr
26
        where
27
        pub.pubname = $1
28
        and pub.schemaname like (case $2 when '*' then '%' else $2 end)
29
        and pub.tablename like (case $3 when '*' then '%' else $3 end)
30
     )
31
     insert into realtime.subscription as x(
32
        subscription_id,
33
        entity,
34
        filters,
35
        claims
36
      )
37
      select
38
        $4::text::uuid,
39
        sub_tables.entity,
40
        $6,
41
        $5
42
      from
43
        sub_tables
44
        on conflict
45
        (subscription_id, entity, filters)
46
        do update set
47
        claims = excluded.claims,
48
        created_at = now()
49
      returning
50
         id"
51

52
    transaction(conn, fn conn ->
15✔
53
      Enum.map(params_list, fn %{id: id, claims: claims, params: params} ->
15✔
54
        case parse_subscription_params(params) do
33✔
55
          {:ok, [schema, table, filters]} ->
56
            case query(conn, sql, [publication, schema, table, id, claims, filters]) do
28✔
57
              {:ok, %{num_rows: num} = result} when num > 0 ->
58
                send(manager, {:subscribed, {caller, id}})
23✔
59
                result
23✔
60

61
              {:ok, _} ->
62
                msg =
5✔
63
                  "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
5✔
64

65
                log_warning("RealtimeDisabledForConfiguration", msg)
5✔
66
                rollback(conn, msg)
5✔
67

68
              {:error, exception} ->
69
                msg =
×
70
                  "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
×
71

72
                log_error("RealtimeSubscriptionError", msg)
×
73

74
                rollback(conn, msg)
×
75
            end
76

77
          {:error, reason} ->
78
            rollback(conn, reason)
5✔
79
        end
80
      end)
81
    end)
82
  end
83

84
  defp params_to_log(map) do
85
    map
86
    |> Map.to_list()
87
    |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
5✔
88
  end
89

90
  @spec delete(conn(), String.t()) :: any()
91
  def delete(conn, id) do
92
    Logger.debug("Delete subscription")
1✔
93
    sql = "delete from realtime.subscription where subscription_id = $1"
1✔
94
    # TODO: connection can be not available
95
    {:ok, _} = query(conn, sql, [id])
1✔
96
  end
97

98
  @spec delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
99
  def delete_all(conn) do
100
    Logger.debug("Delete all subscriptions")
19✔
101
    query(conn, "delete from realtime.subscription;", [])
19✔
102
  end
103

104
  @spec delete_multi(conn(), [Ecto.UUID.t()]) ::
105
          {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
106
  def delete_multi(conn, ids) do
107
    Logger.debug("Delete multi ids subscriptions")
3✔
108
    sql = "delete from realtime.subscription where subscription_id = ANY($1::uuid[])"
3✔
109
    query(conn, sql, [ids])
3✔
110
  end
111

112
  @spec maybe_delete_all(conn()) :: {:ok, Postgrex.Result.t()} | {:error, Exception.t()}
113
  def maybe_delete_all(conn) do
114
    query(
7✔
115
      conn,
116
      "do $$
117
        begin
118
          if exists (
119
            select 1
120
            from pg_tables
121
            where schemaname = 'realtime'
122
              and tablename  = 'subscription'
123
          )
124
          then
125
            delete from realtime.subscription;
126
          end if;
127
      end $$",
128
      []
129
    )
130
  end
131

132
  @spec fetch_publication_tables(conn(), String.t()) ::
133
          %{
134
            {<<_::1>>} => [integer()],
135
            {String.t()} => [integer()],
136
            {String.t(), String.t()} => [integer()]
137
          }
138
          | %{}
139
  def fetch_publication_tables(conn, publication) do
140
    sql = "select
28✔
141
    schemaname, tablename, format('%I.%I', schemaname, tablename)::regclass as oid
142
    from pg_publication_tables where pubname = $1"
143

144
    case query(conn, sql, [publication]) do
28✔
145
      {:ok, %{columns: ["schemaname", "tablename", "oid"], rows: rows}} ->
146
        Enum.reduce(rows, %{}, fn [schema, table, oid], acc ->
147
          if String.contains?(table, " ") do
479✔
148
            log_error(
×
149
              "TableHasSpacesInName",
150
              "Table name cannot have spaces: \"#{schema}\".\"#{table}\""
×
151
            )
152
          end
153

154
          Map.put(acc, {schema, table}, [oid])
155
          |> Map.update({schema}, [oid], &[oid | &1])
318✔
156
          |> Map.update({"*"}, [oid], &[oid | &1])
479✔
157
        end)
158
        |> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)
28✔
159

160
      _ ->
161
        %{}
×
162
    end
163
  end
164

165
  @doc """
166
  Parses subscription filter parameters into something we can pass into our `create_subscription` query.
167

168
  We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
169

170
  ## Examples
171

172
      iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"}
173
      iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
174
      {:ok, ["public", "messages", [{"subject", "eq", "hey"}]]}
175

176
  `in` filter:
177

178
      iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"}
179
      iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
180
      {:ok, ["public", "messages", [{"subject", "in", "{hidee,ho}"}]]}
181

182
  An unsupported filter will respond with an error tuple:
183

184
      iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"}
185
      iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
186
      {:error, ~s(Error parsing `filter` params: ["like", "hey"])}
187

188
  Catch `undefined` filters:
189

190
      iex> params = %{"schema" => "public", "table" => "messages", "filter" => "undefined"}
191
      iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
192
      {:error, ~s(Error parsing `filter` params: ["undefined"])}
193

194
  """
195

196
  @spec parse_subscription_params(map()) :: {:ok, list} | {:error, binary()}
197
  def parse_subscription_params(params) do
198
    case params do
37✔
199
      %{"schema" => schema, "table" => table, "filter" => filter} ->
200
        with [col, rest] <- String.split(filter, "=", parts: 2),
4✔
201
             [filter_type, value] when filter_type in @filter_types <-
3✔
202
               String.split(rest, ".", parts: 2),
203
             {:ok, formatted_value} <- format_filter_value(filter_type, value) do
2✔
204
          {:ok, [schema, table, [{col, filter_type, formatted_value}]]}
205
        else
206
          {:error, msg} ->
×
207
            {:error, "Error parsing `filter` params: #{msg}"}
×
208

209
          e ->
2✔
210
            {:error, "Error parsing `filter` params: #{inspect(e)}"}
211
        end
212

213
      %{"schema" => schema, "table" => table} ->
1✔
214
        {:ok, [schema, table, []]}
215

216
      %{"schema" => schema} ->
27✔
217
        {:ok, [schema, "*", []]}
218

219
      %{"table" => table} ->
×
220
        {:ok, ["public", table, []]}
221

222
      map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
2✔
223
        {:error,
224
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
225

226
      error ->
3✔
227
        {:error,
228
         "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
229
    end
230
  end
231

232
  defp format_filter_value(filter, value) do
233
    case filter do
2✔
234
      "in" ->
235
        case Regex.run(~r/^\((.*)\)$/, value) do
1✔
236
          nil ->
×
237
            {:error, "`in` filter value must be wrapped by parentheses"}
238

239
          [_, new_value] ->
1✔
240
            {:ok, "{#{new_value}}"}
1✔
241
        end
242

243
      _ ->
1✔
244
        {:ok, value}
245
    end
246
  end
247
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc