• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 244c2a23598a309d81463f64694b4ca9d57cf06d

31 Jan 2025 10:02PM UTC coverage: 80.433% (-0.3%) from 80.76%
244c2a23598a309d81463f64694b4ca9d57cf06d

push

github

web-flow
fix: use ets for janitor instead of syn (#1290)

use ets for janitor instead of syn

2 of 2 new or added lines in 2 files covered. (100.0%)

7 existing lines in 2 files now uncovered.

1673 of 2080 relevant lines covered (80.43%)

366.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.82
/lib/extensions/postgres_cdc_rls/subscriptions_checker.ex
1
defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
2
  @moduledoc false
3
  use GenServer
4
  require Logger
5
  import Realtime.Logs
6
  alias Extensions.PostgresCdcRls, as: Rls
7

8
  alias Realtime.Database
9
  alias Realtime.Helpers
10
  alias Realtime.Rpc
11
  alias Realtime.Telemetry
12

13
  alias Rls.Subscriptions
14

15
  @timeout 120_000
16
  @max_delete_records 1000
17

18
  defmodule State do
19
    @moduledoc false
20
    defstruct [:id, :conn, :check_active_pids, :subscribers_tid, :delete_queue]
21

22
    @type t :: %__MODULE__{
23
            id: String.t(),
24
            conn: Postgrex.conn(),
25
            check_active_pids: reference(),
26
            subscribers_tid: :ets.tid(),
27
            delete_queue: %{
28
              ref: reference(),
29
              queue: :queue.queue()
30
            }
31
          }
32
  end
33

34
  @spec start_link(GenServer.options()) :: GenServer.on_start()
35
  def start_link(opts) do
36
    GenServer.start_link(__MODULE__, opts)
3✔
37
  end
38

39
  ## Callbacks
40

41
  @impl true
42
  def init(args) do
43
    %{"id" => id} = args
3✔
44
    Logger.metadata(external_id: id, project: id)
3✔
45
    {:ok, nil, {:continue, {:connect, args}}}
3✔
46
  end
47

48
  @impl true
49
  def handle_continue({:connect, args}, _) do
50
    %{"id" => id, "subscribers_tid" => subscribers_tid} = args
3✔
51

52
    realtime_subscription_checker_settings =
3✔
53
      Database.from_settings(args, "realtime_subscription_checker")
54

55
    {:ok, conn} = Database.connect_db(realtime_subscription_checker_settings)
3✔
56

57
    state = %State{
3✔
58
      id: id,
59
      conn: conn,
60
      check_active_pids: check_active_pids(),
61
      subscribers_tid: subscribers_tid,
62
      delete_queue: %{
63
        ref: nil,
64
        queue: :queue.new()
65
      }
66
    }
67

68
    {:noreply, state}
69
  end
70

71
  @impl true
72
  def handle_info(
73
        :check_active_pids,
74
        %State{check_active_pids: ref, subscribers_tid: tid, delete_queue: delete_queue, id: id} =
75
          state
76
      ) do
UNCOV
77
    Helpers.cancel_timer(ref)
×
78

UNCOV
79
    ids =
×
80
      tid
81
      |> subscribers_by_node()
82
      |> not_alive_pids_dist()
83
      |> pop_not_alive_pids(tid, id)
84

UNCOV
85
    new_delete_queue =
×
86
      if length(ids) > 0 do
87
        q =
×
88
          Enum.reduce(ids, delete_queue.queue, fn id, acc ->
×
89
            if :queue.member(id, acc), do: acc, else: :queue.in(id, acc)
×
90
          end)
91

92
        %{
×
93
          ref: check_delete_queue(),
94
          queue: q
95
        }
96
      else
UNCOV
97
        delete_queue
×
98
      end
99

100
    {:noreply, %{state | check_active_pids: check_active_pids(), delete_queue: new_delete_queue}}
101
  end
102

103
  def handle_info(:check_delete_queue, %State{delete_queue: %{ref: ref, queue: q}} = state) do
104
    Helpers.cancel_timer(ref)
×
105

106
    new_queue =
×
107
      if !:queue.is_empty(q) do
×
108
        {ids, q1} = Helpers.queue_take(q, @max_delete_records)
×
109
        Logger.warning("Delete #{length(ids)} phantom subscribers from db")
×
110

111
        case Subscriptions.delete_multi(state.conn, ids) do
×
112
          {:ok, _} ->
113
            q1
×
114

115
          {:error, reason} ->
116
            log_error("UnableToDeletePhantomSubscriptions", reason)
×
117

118
            q
×
119
        end
120
      else
121
        q
×
122
      end
123

124
    new_ref = if !:queue.is_empty(new_queue), do: check_delete_queue(), else: ref
×
125

126
    {:noreply, %{state | delete_queue: %{ref: new_ref, queue: new_queue}}}
127
  end
128

129
  ## Internal functions
130

131
  @spec pop_not_alive_pids([pid()], :ets.tid(), binary()) :: [Ecto.UUID.t()]
132
  def pop_not_alive_pids(pids, tid, tenant_id) do
133
    Enum.reduce(pids, [], fn pid, acc ->
2✔
134
      case :ets.lookup(tid, pid) do
2✔
135
        [] ->
136
          Telemetry.execute(
×
137
            [:realtime, :subscriptions_checker, :pid_not_found],
138
            %{quantity: 1},
139
            %{tenant_id: tenant_id}
140
          )
141

142
          acc
×
143

144
        results ->
145
          for {^pid, postgres_id, _ref, _node} <- results do
3✔
146
            Telemetry.execute(
3✔
147
              [:realtime, :subscriptions_checker, :phantom_pid_detected],
148
              %{quantity: 1},
149
              %{tenant_id: tenant_id}
150
            )
151

152
            :ets.delete(tid, pid)
3✔
153
            UUID.string_to_binary!(postgres_id)
3✔
154
          end ++ acc
2✔
155
      end
156
    end)
157
  end
158

159
  @spec subscribers_by_node(:ets.tid()) :: %{node() => MapSet.t(pid())}
160
  def subscribers_by_node(tid) do
161
    fn {pid, _postgres_id, _ref, node}, acc ->
162
      set = if Map.has_key?(acc, node), do: MapSet.put(acc[node], pid), else: MapSet.new([pid])
3✔
163

164
      Map.put(acc, node, set)
3✔
165
    end
166
    |> :ets.foldl(%{}, tid)
1✔
167
  end
168

169
  @spec not_alive_pids_dist(%{node() => MapSet.t(pid())}) :: [pid()] | []
170
  def not_alive_pids_dist(pids) do
UNCOV
171
    Enum.reduce(pids, [], fn {node, pids}, acc ->
×
172
      if node == node() do
×
173
        acc ++ not_alive_pids(pids)
×
174
      else
175
        case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], timeout: 15_000) do
×
176
          {:badrpc, _} = error ->
177
            log_error("UnableToCheckProcessesOnRemoteNode", error)
×
178
            acc
×
179

180
          pids ->
181
            acc ++ pids
×
182
        end
183
      end
184
    end)
185
  end
186

187
  @spec not_alive_pids(MapSet.t(pid())) :: [pid()] | []
188
  def not_alive_pids(pids) do
189
    Enum.reduce(pids, [], fn pid, acc -> if Process.alive?(pid), do: acc, else: [pid | acc] end)
3✔
190
  end
191

192
  defp check_delete_queue(), do: Process.send_after(self(), :check_delete_queue, 1000)
×
193

194
  defp check_active_pids(), do: Process.send_after(self(), :check_active_pids, @timeout)
3✔
195
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc