• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / c5c8f617a3ad63a2c3506a1b61ecd513ebad1ed3

07 May 2025 08:44PM UTC coverage: 82.461% (+0.4%) from 82.079%
c5c8f617a3ad63a2c3506a1b61ecd513ebad1ed3

push

github

web-flow
fix: remove region from syn conflict handling & non found process on register_process (#1363)

7 of 10 new or added lines in 4 files covered. (70.0%)

8 existing lines in 3 files now uncovered.

1749 of 2121 relevant lines covered (82.46%)

1349.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.19
/lib/realtime/tenants/connect.ex
1
defmodule Realtime.Tenants.Connect do
2
  @moduledoc """
3
  This module is responsible for attempting to connect to a tenant's database and store the DBConnection in a Syn registry.
4

5
  ## Options
6
  * `:check_connected_user_interval` - The interval in milliseconds to check if there are any connected users to a tenant channel. If there are no connected users, the connection will be stopped.
7
  * `:erpc_timeout` - The timeout in milliseconds for the `:erpc` calls to the tenant's database.
8
  """
9
  use GenServer, restart: :transient
10

11
  require Logger
12

13
  import Realtime.Logs
14

15
  alias Realtime.Api.Tenant
16
  alias Realtime.Rpc
17
  alias Realtime.Tenants
18
  alias Realtime.Tenants.Connect.Backoff
19
  alias Realtime.Tenants.Connect.CheckConnection
20
  alias Realtime.Tenants.Connect.GetTenant
21
  alias Realtime.Tenants.Connect.Piper
22
  alias Realtime.Tenants.Connect.RegisterProcess
23
  alias Realtime.Tenants.Connect.StartCounters
24
  alias Realtime.Tenants.Listen
25
  alias Realtime.Tenants.Migrations
26
  alias Realtime.Tenants.ReplicationConnection
27
  alias Realtime.UsersCounter
28

29
  @rpc_timeout_default 30_000
30
  @check_connected_user_interval_default 50_000
31
  @connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
32

33
  defstruct tenant_id: nil,
34
            db_conn_reference: nil,
35
            db_conn_pid: nil,
36
            replication_connection_pid: nil,
37
            replication_connection_reference: nil,
38
            listen_pid: nil,
39
            listen_reference: nil,
40
            check_connected_user_interval: nil,
41
            connected_users_bucket: [1]
42

43
  @doc """
44
  Returns the database connection for a tenant. If the tenant is not connected, it will attempt to connect to the tenant's database.
45
  """
46
  @spec lookup_or_start_connection(binary(), keyword()) ::
47
          {:ok, pid()}
48
          | {:error, :tenant_database_unavailable}
49
          | {:error, :initializing}
50
          | {:error, :tenant_database_connection_initializing}
51
          | {:error, :rpc_error, term()}
52
  def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
53
    case get_status(tenant_id) do
130✔
54
      {:ok, conn} ->
27✔
55
        {:ok, conn}
56

57
      {:error, :tenant_database_unavailable} ->
58
        call_external_node(tenant_id, opts)
×
59

60
      {:error, :tenant_database_connection_initializing} ->
61
        Process.sleep(100)
102✔
62
        call_external_node(tenant_id, opts)
102✔
63

64
      {:error, :initializing} ->
1✔
65
        {:error, :tenant_database_unavailable}
66
    end
67
  end
68

69
  @doc """
70
  Returns the database connection pid from :syn if it exists.
71
  """
72
  @spec get_status(binary()) ::
73
          {:ok, pid()}
74
          | {:error, :tenant_database_unavailable}
75
          | {:error, :initializing}
76
          | {:error, :tenant_database_connection_initializing}
77
  def get_status(tenant_id) do
78
    case :syn.lookup(__MODULE__, tenant_id) do
247✔
79
      {_, %{conn: nil}} ->
3✔
80
        {:error, :initializing}
81

82
      {_, %{conn: conn}} ->
134✔
83
        {:ok, conn}
84

85
      :undefined ->
86
        Logger.warning("Connection process starting up")
110✔
87
        {:error, :tenant_database_connection_initializing}
88

89
      error ->
90
        log_error("SynInitializationError", error)
×
91
        {:error, :tenant_database_unavailable}
92
    end
93
  end
94

95
  @doc """
96
  Connects to a tenant's database and stores the DBConnection in the process :syn metadata
97
  """
98
  @spec connect(binary(), keyword()) :: {:ok, DBConnection.t()} | {:error, term()}
99
  def connect(tenant_id, opts \\ []) do
100
    supervisor =
126✔
101
      {:via, PartitionSupervisor, {Realtime.Tenants.Connect.DynamicSupervisor, tenant_id}}
102

103
    spec = {__MODULE__, [tenant_id: tenant_id] ++ opts}
126✔
104

105
    case DynamicSupervisor.start_child(supervisor, spec) do
126✔
106
      {:ok, _} ->
107
        get_status(tenant_id)
98✔
108

109
      {:error, {:already_started, _}} ->
110
        get_status(tenant_id)
13✔
111

112
      {:error, {:shutdown, :tenant_db_too_many_connections}} ->
2✔
113
        {:error, :tenant_db_too_many_connections}
114

115
      {:error, {:shutdown, :tenant_not_found}} ->
×
116
        {:error, :tenant_not_found}
117

118
      {:error, {:shutdown, :tenant_create_backoff}} ->
119
        log_warning("TooManyConnectAttempts", "Too many connect attempts to tenant database")
6✔
120
        {:error, :tenant_create_backoff}
121

122
      {:error, :shutdown} ->
123
        log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
×
124
        {:error, :tenant_database_unavailable}
125

126
      {:error, error} ->
127
        log_error("UnableToConnectToTenantDatabase", error)
7✔
128
        {:error, :tenant_database_unavailable}
129
    end
130
  end
131

132
  @doc """
133
  Returns the pid of the tenant Connection process and db_conn pid
134
  """
135
  @spec whereis(binary()) :: pid() | nil
136
  def whereis(tenant_id) do
137
    case :syn.lookup(__MODULE__, tenant_id) do
114✔
138
      {pid, _} when is_pid(pid) -> pid
37✔
139
      _ -> nil
77✔
140
    end
141
  end
142

143
  @doc """
144
  Shutdown the tenant Connection and linked processes
145
  """
146
  @spec shutdown(binary()) :: :ok | nil
147
  def shutdown(tenant_id) do
148
    case whereis(tenant_id) do
55✔
149
      pid when is_pid(pid) ->
150
        send(pid, :shutdown_connect)
28✔
151
        :ok
152

153
      _ ->
27✔
154
        :ok
155
    end
156
  end
157

158
  def start_link(opts) do
159
    tenant_id = Keyword.get(opts, :tenant_id)
128✔
160

161
    check_connected_user_interval =
128✔
162
      Keyword.get(opts, :check_connected_user_interval, @check_connected_user_interval_default)
163

164
    name = {__MODULE__, tenant_id, %{conn: nil}}
128✔
165

166
    state = %__MODULE__{
128✔
167
      tenant_id: tenant_id,
168
      check_connected_user_interval: check_connected_user_interval
169
    }
170

171
    opts = Keyword.put(opts, :name, {:via, :syn, name})
128✔
172

173
    GenServer.start_link(__MODULE__, state, opts)
128✔
174
  end
175

176
  ## GenServer callbacks
177
  # Needs to be done on init/1 to guarantee the GenServer only starts if we are able to connect to the database
178
  @impl GenServer
179
  def init(%{tenant_id: tenant_id} = state) do
180
    Logger.metadata(external_id: tenant_id, project: tenant_id)
115✔
181

182
    pipes = [
115✔
183
      GetTenant,
184
      Backoff,
185
      CheckConnection,
186
      StartCounters,
187
      RegisterProcess
188
    ]
189

190
    case Piper.run(pipes, state) do
115✔
191
      {:ok, acc} ->
192
        {:ok, acc, {:continue, :run_migrations}}
100✔
193

194
      {:error, :tenant_not_found} ->
×
195
        {:stop, {:shutdown, :tenant_not_found}}
196

197
      {:error, :tenant_db_too_many_connections} ->
2✔
198
        {:stop, {:shutdown, :tenant_db_too_many_connections}}
199

200
      {:error, :tenant_create_backoff} ->
6✔
201
        {:stop, {:shutdown, :tenant_create_backoff}}
202

203
      {:error, error} ->
204
        log_error("UnableToConnectToTenantDatabase", error)
×
205
        {:stop, :shutdown}
206
    end
207
  end
208

209
  def handle_continue(:run_migrations, state) do
100✔
210
    %{tenant: tenant, db_conn_pid: db_conn_pid} = state
100✔
211
    Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}")
100✔
212

213
    with res when res in [:ok, :noop] <- Migrations.run_migrations(tenant),
100✔
214
         :ok <- Migrations.create_partitions(db_conn_pid) do
99✔
215
      {:noreply, state, {:continue, :start_listen_and_replication}}
96✔
216
    else
217
      error ->
218
        log_error("MigrationsFailedToRun", error)
×
219
        {:stop, :shutdown, state}
×
220
    end
221
  rescue
222
    error ->
1✔
223
      log_error("MigrationsFailedToRun", error)
1✔
224
      {:stop, :shutdown, state}
1✔
225
  end
226

227
  def handle_continue(:start_listen_and_replication, state) do
96✔
228
    %{tenant: tenant} = state
96✔
229

230
    with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()),
96✔
231
         {:ok, listen_pid} <- Listen.start(tenant, self()) do
95✔
232
      replication_connection_reference = Process.monitor(replication_connection_pid)
95✔
233
      listen_reference = Process.monitor(listen_pid)
95✔
234

235
      state = %{
95✔
236
        state
237
        | replication_connection_pid: replication_connection_pid,
238
          replication_connection_reference: replication_connection_reference,
239
          listen_pid: listen_pid,
240
          listen_reference: listen_reference
241
      }
242

243
      {:noreply, state, {:continue, :setup_connected_user_events}}
95✔
244
    else
245
      {:error, :max_wal_senders_reached} ->
246
        log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
1✔
247
        {:stop, :shutdown, state}
1✔
248

249
      {:error, error} ->
UNCOV
250
        log_error("StartListenAndReplicationFailed", error)
×
UNCOV
251
        {:stop, :shutdown, state}
×
252
    end
253
  rescue
254
    error ->
×
255
      log_error("StartListenAndReplicationFailed", error)
×
256
      {:stop, :shutdown, state}
×
257
  end
258

259
  @impl true
260
  def handle_continue(:setup_connected_user_events, state) do
261
    %{
262
      check_connected_user_interval: check_connected_user_interval,
263
      connected_users_bucket: connected_users_bucket,
264
      tenant_id: tenant_id
265
    } = state
95✔
266

267
    :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
95✔
268
    send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)
95✔
269
    :ets.insert(__MODULE__, {tenant_id})
95✔
270
    {:noreply, state}
271
  end
272

273
  @impl GenServer
274
  def handle_info(
275
        :check_connected_users,
276
        %{
277
          tenant_id: tenant_id,
278
          check_connected_user_interval: check_connected_user_interval,
279
          connected_users_bucket: connected_users_bucket
280
        } = state
281
      ) do
282
    connected_users_bucket =
132✔
283
      tenant_id
284
      |> update_connected_users_bucket(connected_users_bucket)
285
      |> send_connected_user_check_message(check_connected_user_interval)
286

287
    {:noreply, %{state | connected_users_bucket: connected_users_bucket}}
288
  end
289

290
  def handle_info(:shutdown_no_connected_users, state) do
291
    Logger.info("Tenant has no connected users, database connection will be terminated")
2✔
292
    shutdown_connect_process(state)
2✔
293
  end
294

295
  def handle_info(:suspend_tenant, state) do
296
    Logger.warning("Tenant was suspended, database connection will be terminated")
2✔
297
    shutdown_connect_process(state)
2✔
298
  end
299

300
  def handle_info(:shutdown_connect, state) do
301
    Logger.warning("Shutdowning tenant connection")
25✔
302
    shutdown_connect_process(state)
25✔
303
  end
304

305
  # Handle database connection termination
306
  def handle_info(
307
        {:DOWN, db_conn_reference, _, _, _},
308
        %{db_conn_reference: db_conn_reference} = state
309
      ) do
310
    Logger.warning("Database connection has been terminated")
×
311
    {:stop, :shutdown, state}
×
312
  end
313

314
  # Handle replication connection termination
315
  def handle_info(
316
        {:DOWN, replication_connection_reference, _, _, _},
317
        %{replication_connection_reference: replication_connection_reference} = state
318
      ) do
319
    Logger.warning("Replication connection has died")
33✔
320
    {:stop, :shutdown, state}
33✔
321
  end
322

323
  #  Handle listen connection termination
324
  def handle_info(
325
        {:DOWN, listen_reference, _, _, _},
326
        %{listen_reference: listen_reference} = state
327
      ) do
328
    Logger.warning("Listen has been terminated")
32✔
329
    {:stop, :shutdown, state}
32✔
330
  end
331

332
  # Ignore messages to avoid handle_info unmatched functions
333
  def handle_info(_, state) do
×
334
    {:noreply, state}
335
  end
336

337
  @impl true
338
  def terminate(reason, %{tenant_id: tenant_id}) do
339
    Logger.info("Tenant #{tenant_id} has been terminated: #{inspect(reason)}")
67✔
340
    Realtime.MetricsCleaner.delete_metric(tenant_id)
67✔
341
    :ok
342
  end
343

344
  ## Private functions
345
  defp call_external_node(tenant_id, opts) do
346
    rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default)
102✔
347

348
    with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
102✔
349
         :ok <- tenant_suspended?(tenant),
102✔
350
         {:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do
100✔
351
      Rpc.enhanced_call(node, __MODULE__, :connect, [tenant_id, opts], timeout: rpc_timeout, tenant: tenant_id)
99✔
352
    end
353
  end
354

355
  defp update_connected_users_bucket(tenant_id, connected_users_bucket) do
356
    connected_users_bucket
357
    |> then(&(&1 ++ [UsersCounter.tenant_users(tenant_id)]))
132✔
358
    |> Enum.take(-6)
132✔
359
  end
360

361
  defp send_connected_user_check_message(
362
         @connected_users_bucket_shutdown,
363
         check_connected_user_interval
364
       ) do
365
    Process.send_after(self(), :shutdown_no_connected_users, check_connected_user_interval)
2✔
366
  end
367

368
  defp send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) do
369
    Process.send_after(self(), :check_connected_users, check_connected_user_interval)
225✔
370
    connected_users_bucket
225✔
371
  end
372

373
  defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended}
2✔
374
  defp tenant_suspended?(_), do: :ok
100✔
375

376
  defp shutdown_connect_process(state) do
377
    %{
378
      db_conn_pid: db_conn_pid,
379
      replication_connection_pid: replication_connection_pid,
380
      listen_pid: listen_pid
381
    } = state
29✔
382

383
    :ok = GenServer.stop(db_conn_pid, :shutdown, 500)
29✔
384

385
    replication_connection_pid && Process.alive?(replication_connection_pid) &&
×
386
      GenServer.stop(replication_connection_pid, :normal, 500)
×
387

388
    listen_pid && Process.alive?(listen_pid) &&
×
389
      GenServer.stop(listen_pid, :normal, 500)
×
390

391
    {:stop, :normal, state}
×
392
  end
393
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc