• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / b342f86940ffa4da329270b1300d35a68b357976-PR-1573

14 Oct 2025 09:54PM UTC coverage: 85.669% (+0.4%) from 85.3%
b342f86940ffa4da329270b1300d35a68b357976-PR-1573

Pull #1573

github

edgurgel
fix: if ets table is not there fallback to broadcast to cluster
Pull Request #1573: feat: selective broadcast for postgres changes

42 of 45 new or added lines in 6 files covered. (93.33%)

10 existing lines in 6 files now uncovered.

2152 of 2512 relevant lines covered (85.67%)

2733.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.31
/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex
1
defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
2
  @moduledoc false
3
  use GenServer
4
  use Realtime.Logs
5

6
  alias Extensions.PostgresCdcRls, as: Rls
7

8
  alias Realtime.Database
9
  alias Realtime.Helpers
10
  alias Realtime.Rpc
11
  alias Realtime.Telemetry
12

13
  alias Rls.Subscriptions
14

15
  @timeout 120_000
16
  @max_delete_records 1000
17

18
  defmodule State do
19
    @moduledoc false
20
    defstruct [:id, :conn, :check_active_pids, :subscribers_pids_table, :subscribers_nodes_table, :delete_queue]
21

22
    @type t :: %__MODULE__{
23
            id: String.t(),
24
            conn: Postgrex.conn(),
25
            check_active_pids: reference(),
26
            subscribers_pids_table: :ets.tid(),
27
            subscribers_nodes_table: :ets.tid(),
28
            delete_queue: %{
29
              ref: reference(),
30
              queue: :queue.queue()
31
            }
32
          }
33
  end
34

35
  @spec start_link(GenServer.options()) :: GenServer.on_start()
36
  def start_link(opts) do
37
    GenServer.start_link(__MODULE__, opts)
32✔
38
  end
39

40
  ## Callbacks
41

42
  @impl true
43
  def init(args) do
44
    %{"id" => id} = args
32✔
45
    Logger.metadata(external_id: id, project: id)
32✔
46
    {:ok, nil, {:continue, {:connect, args}}}
32✔
47
  end
48

49
  @impl true
50
  def handle_continue({:connect, args}, _) do
51
    %{
52
      "id" => id,
53
      "subscribers_pids_table" => subscribers_pids_table,
54
      "subscribers_nodes_table" => subscribers_nodes_table
55
    } = args
32✔
56

57
    realtime_subscription_checker_settings =
32✔
58
      Database.from_settings(args, "realtime_subscription_checker")
59

60
    {:ok, conn} = Database.connect_db(realtime_subscription_checker_settings)
32✔
61

62
    state = %State{
31✔
63
      id: id,
64
      conn: conn,
65
      check_active_pids: check_active_pids(),
66
      subscribers_pids_table: subscribers_pids_table,
67
      subscribers_nodes_table: subscribers_nodes_table,
68
      delete_queue: %{
69
        ref: nil,
70
        queue: :queue.new()
71
      }
72
    }
73

74
    {:noreply, state}
75
  end
76

77
  @impl true
78
  def handle_info(:check_active_pids, %State{check_active_pids: ref, delete_queue: delete_queue, id: id} = state) do
UNCOV
79
    Helpers.cancel_timer(ref)
×
80

81
    ids =
×
NEW
82
      state.subscribers_pids_table
×
83
      |> subscribers_by_node()
84
      |> not_alive_pids_dist()
NEW
85
      |> pop_not_alive_pids(state.subscribers_pids_table, state.subscribers_nodes_table, id)
×
86

87
    new_delete_queue =
×
88
      if length(ids) > 0 do
89
        q =
×
90
          Enum.reduce(ids, delete_queue.queue, fn id, acc ->
×
91
            if :queue.member(id, acc), do: acc, else: :queue.in(id, acc)
×
92
          end)
93

94
        %{
×
95
          ref: check_delete_queue(),
96
          queue: q
97
        }
98
      else
99
        delete_queue
×
100
      end
101

102
    {:noreply, %{state | check_active_pids: check_active_pids(), delete_queue: new_delete_queue}}
103
  end
104

105
  def handle_info(:check_delete_queue, %State{delete_queue: %{ref: ref, queue: q}} = state) do
106
    Helpers.cancel_timer(ref)
×
107

108
    new_queue =
×
109
      if :queue.is_empty(q) do
110
        q
×
111
      else
112
        {ids, q1} = Helpers.queue_take(q, @max_delete_records)
×
113
        Logger.warning("Delete #{length(ids)} phantom subscribers from db")
×
114

115
        case Subscriptions.delete_multi(state.conn, ids) do
×
116
          {:ok, _} ->
117
            q1
×
118

119
          {:error, reason} ->
120
            log_error("UnableToDeletePhantomSubscriptions", reason)
×
121

122
            q
×
123
        end
124
      end
125

126
    new_ref = if :queue.is_empty(new_queue), do: ref, else: check_delete_queue()
×
127

128
    {:noreply, %{state | delete_queue: %{ref: new_ref, queue: new_queue}}}
129
  end
130

131
  ## Internal functions
132

133
  @spec pop_not_alive_pids([pid()], :ets.tid(), :ets.tid(), binary()) :: [Ecto.UUID.t()]
134
  def pop_not_alive_pids(pids, subscribers_pids_table, subscribers_nodes_table, tenant_id) do
135
    Enum.reduce(pids, [], fn pid, acc ->
2✔
136
      case :ets.lookup(subscribers_pids_table, pid) do
2✔
137
        [] ->
138
          Telemetry.execute(
×
139
            [:realtime, :subscriptions_checker, :pid_not_found],
140
            %{quantity: 1},
141
            %{tenant_id: tenant_id}
142
          )
143

144
          acc
×
145

146
        results ->
147
          for {^pid, postgres_id, _ref, _node} <- results do
3✔
148
            Telemetry.execute(
3✔
149
              [:realtime, :subscriptions_checker, :phantom_pid_detected],
150
              %{quantity: 1},
151
              %{tenant_id: tenant_id}
152
            )
153

154
            :ets.delete(subscribers_pids_table, pid)
3✔
155
            bin_id = UUID.string_to_binary!(postgres_id)
3✔
156

157
            :ets.delete(subscribers_nodes_table, bin_id)
3✔
158
            bin_id
3✔
159
          end ++ acc
2✔
160
      end
161
    end)
162
  end
163

164
  @spec subscribers_by_node(:ets.tid()) :: %{node() => MapSet.t(pid())}
165
  def subscribers_by_node(tid) do
166
    fn {pid, _postgres_id, _ref, node}, acc ->
167
      set = if Map.has_key?(acc, node), do: MapSet.put(acc[node], pid), else: MapSet.new([pid])
3✔
168

169
      Map.put(acc, node, set)
3✔
170
    end
171
    |> :ets.foldl(%{}, tid)
1✔
172
  end
173

174
  @spec not_alive_pids_dist(%{node() => MapSet.t(pid())}) :: [pid()] | []
175
  def not_alive_pids_dist(pids) do
176
    Enum.reduce(pids, [], fn {node, pids}, acc ->
×
177
      if node == node() do
×
178
        acc ++ not_alive_pids(pids)
×
179
      else
180
        case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], timeout: 15_000) do
×
181
          {:badrpc, _} = error ->
182
            log_error("UnableToCheckProcessesOnRemoteNode", error)
×
183
            acc
×
184

185
          pids ->
186
            acc ++ pids
×
187
        end
188
      end
189
    end)
190
  end
191

192
  @spec not_alive_pids(MapSet.t(pid())) :: [pid()] | []
193
  def not_alive_pids(pids) do
194
    Enum.reduce(pids, [], fn pid, acc -> if Process.alive?(pid), do: acc, else: [pid | acc] end)
3✔
195
  end
196

197
  defp check_delete_queue, do: Process.send_after(self(), :check_delete_queue, 1000)
×
198

199
  defp check_active_pids, do: Process.send_after(self(), :check_active_pids, @timeout)
31✔
200
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc