• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 46460e2d809e9bc0a072465e48ba1468d46715d7-PR-1536

18 Sep 2025 09:37PM UTC coverage: 85.437% (+0.3%) from 85.184%
46460e2d809e9bc0a072465e48ba1468d46715d7-PR-1536

Pull #1536

github

edgurgel
fix: too many db connections multiple processes
Pull Request #1536: fix: match error on Connect

1 of 1 new or added line in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

2071 of 2424 relevant lines covered (85.44%)

2269.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.62
/lib/realtime/tenants/connect.ex
1
defmodule Realtime.Tenants.Connect do
2
  @moduledoc """
3
  This module is responsible for attempting to connect to a tenant's database and store the DBConnection in a Syn registry.
4

5
  ## Options
6
  * `:check_connected_user_interval` - The interval in milliseconds to check if there are any connected users to a tenant channel. If there are no connected users, the connection will be stopped.
7
  * `:check_connect_region_interval` - The interval in milliseconds to check if this process is in the correct region. If the region is not correct it stops the connection.
8
  * `:erpc_timeout` - The timeout in milliseconds for the `:erpc` calls to the tenant's database.
9
  """
10
  use GenServer, restart: :temporary
11

12
  use Realtime.Logs
13

14
  alias Realtime.Tenants.Rebalancer
15
  alias Realtime.Api.Tenant
16
  alias Realtime.Rpc
17
  alias Realtime.Tenants
18
  alias Realtime.Tenants.Connect.CheckConnection
19
  alias Realtime.Tenants.Connect.GetTenant
20
  alias Realtime.Tenants.Connect.Piper
21
  alias Realtime.Tenants.Connect.RegisterProcess
22
  alias Realtime.Tenants.Migrations
23
  alias Realtime.Tenants.ReplicationConnection
24
  alias Realtime.UsersCounter
25

26
  @rpc_timeout_default 30_000
27
  @check_connected_user_interval_default 50_000
28
  @connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0]
29

30
  defstruct tenant_id: nil,
31
            db_conn_reference: nil,
32
            db_conn_pid: nil,
33
            replication_connection_pid: nil,
34
            replication_connection_reference: nil,
35
            check_connected_user_interval: nil,
36
            connected_users_bucket: [1],
37
            check_connect_region_interval: nil
38

39
  @doc "Check if Connect has finished setting up connections"
40
  def ready?(tenant_id) do
41
    case whereis(tenant_id) do
130✔
42
      pid when is_pid(pid) ->
43
        GenServer.call(pid, :ready?)
130✔
44

45
      _ ->
×
46
        false
47
    end
48
  end
49

50
  @doc """
51
  Returns the database connection for a tenant. If the tenant is not connected, it will attempt to connect to the tenant's database.
52
  """
53
  @spec lookup_or_start_connection(binary(), keyword()) ::
54
          {:ok, pid()}
55
          | {:error, :tenant_database_unavailable}
56
          | {:error, :initializing}
57
          | {:error, :tenant_database_connection_initializing}
58
          | {:error, :tenant_db_too_many_connections}
59
          | {:error, :rpc_error, term()}
60
  def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
61
    case get_status(tenant_id) do
683✔
62
      {:ok, conn} ->
465✔
63
        {:ok, conn}
64

UNCOV
65
      {:error, :tenant_database_unavailable} ->
×
66
        {:error, :tenant_database_unavailable}
67

68
      {:error, :tenant_database_connection_initializing} ->
69
        call_external_node(tenant_id, opts)
213✔
70

71
      {:error, :initializing} ->
2✔
72
        {:error, :tenant_database_unavailable}
73

74
      {:error, :tenant_db_too_many_connections} ->
3✔
75
        {:error, :tenant_db_too_many_connections}
76
    end
77
  end
78

79
  @doc """
80
  Returns the database connection pid from :syn if it exists.
81
  """
82
  @spec get_status(binary()) ::
83
          {:ok, pid()}
84
          | {:error, :tenant_database_unavailable}
85
          | {:error, :initializing}
86
          | {:error, :tenant_database_connection_initializing}
87
          | {:error, :tenant_db_too_many_connections}
88
  def get_status(tenant_id) do
89
    case :syn.lookup(__MODULE__, tenant_id) do
897✔
90
      {pid, %{conn: nil}} ->
91
        wait_for_connection(pid, tenant_id)
217✔
92

93
      {_, %{conn: conn}} ->
462✔
94
        {:ok, conn}
95

96
      :undefined ->
218✔
97
        {:error, :tenant_database_connection_initializing}
98

99
      error ->
100
        log_error("SynInitializationError", error)
×
101
        {:error, :tenant_database_unavailable}
102
    end
103
  end
104

105
  def syn_topic(tenant_id), do: "connect:#{tenant_id}"
641✔
106

107
  defp wait_for_connection(pid, tenant_id) do
217✔
108
    RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id))
217✔
109

110
    # We do a lookup after subscribing because we could've missed a message while subscribing
111
    case :syn.lookup(__MODULE__, tenant_id) do
217✔
112
      {_pid, %{conn: conn}} when is_pid(conn) ->
×
113
        {:ok, conn}
114

115
      _ ->
116
        # Wait for up to 5 seconds for the ready event
117
        receive do
217✔
118
          %{event: "ready", payload: %{pid: ^pid, conn: conn}} ->
194✔
119
            {:ok, conn}
120

121
          %{event: "connect_down", payload: %{pid: ^pid, reason: {:shutdown, :tenant_db_too_many_connections}}} ->
6✔
122
            {:error, :tenant_db_too_many_connections}
123

124
          %{event: "connect_down", payload: %{pid: ^pid, reason: _reason}} ->
125
            metadata = [external_id: tenant_id, project: tenant_id]
12✔
126
            log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
12✔
127
            {:error, :tenant_database_unavailable}
128
        after
129
          15_000 -> {:error, :initializing}
5✔
130
        end
131
    end
132
  after
133
    RealtimeWeb.Endpoint.unsubscribe(syn_topic(tenant_id))
217✔
134
  end
135

136
  @doc """
137
  Connects to a tenant's database and stores the DBConnection in the process :syn metadata
138
  """
139
  @spec connect(binary(), binary(), keyword()) :: {:ok, DBConnection.t()} | {:error, term()}
140
  def connect(tenant_id, region, opts \\ []) do
141
    supervisor =
208✔
142
      {:via, PartitionSupervisor, {Realtime.Tenants.Connect.DynamicSupervisor, tenant_id}}
143

144
    spec = {__MODULE__, [tenant_id: tenant_id, region: region] ++ opts}
208✔
145
    metadata = [external_id: tenant_id, project: tenant_id]
208✔
146

147
    case DynamicSupervisor.start_child(supervisor, spec) do
208✔
148
      {:ok, _} ->
149
        get_status(tenant_id)
197✔
150

151
      {:error, {:already_started, _}} ->
152
        get_status(tenant_id)
11✔
153

154
      {:error, error} ->
155
        log_error("UnableToConnectToTenantDatabase", error, metadata)
×
156
        {:error, :tenant_database_unavailable}
157
    end
158
  end
159

160
  @doc """
161
  Returns the pid of the tenant Connection process and db_conn pid
162
  """
163
  @spec whereis(binary()) :: pid() | nil
164
  def whereis(tenant_id) do
165
    case :syn.lookup(__MODULE__, tenant_id) do
717✔
166
      {pid, _} when is_pid(pid) -> pid
328✔
167
      _ -> nil
389✔
168
    end
169
  end
170

171
  @doc """
172
  Shutdown the tenant Connection and linked processes
173
  """
174
  @spec shutdown(binary()) :: :ok | nil
175
  def shutdown(tenant_id) do
176
    case whereis(tenant_id) do
153✔
177
      pid when is_pid(pid) ->
178
        send(pid, :shutdown_connect)
27✔
179
        :ok
180

181
      _ ->
126✔
182
        :ok
183
    end
184
  end
185

186
  def start_link(opts) do
187
    tenant_id = Keyword.get(opts, :tenant_id)
208✔
188
    region = Keyword.get(opts, :region)
208✔
189

190
    check_connected_user_interval =
208✔
191
      Keyword.get(opts, :check_connected_user_interval, @check_connected_user_interval_default)
192

193
    check_connect_region_interval = Keyword.get(opts, :check_connect_region_interval, rebalance_check_interval_in_ms())
208✔
194

195
    name = {__MODULE__, tenant_id, %{conn: nil, region: region}}
208✔
196

197
    state = %__MODULE__{
208✔
198
      tenant_id: tenant_id,
199
      check_connected_user_interval: check_connected_user_interval,
200
      check_connect_region_interval: check_connect_region_interval
201
    }
202

203
    opts = Keyword.put(opts, :name, {:via, :syn, name})
208✔
204

205
    GenServer.start_link(__MODULE__, state, opts)
208✔
206
  end
207

208
  ## GenServer callbacks
209
  # Needs to be done on init/1 to guarantee the GenServer only starts if we are able to connect to the database
210
  @impl GenServer
211
  def init(%{tenant_id: tenant_id} = state) do
212
    Logger.metadata(external_id: tenant_id, project: tenant_id)
197✔
213

214
    {:ok, state, {:continue, :db_connect}}
197✔
215
  end
216

217
  @impl true
218
  def handle_continue(:db_connect, state) do
219
    pipes = [
197✔
220
      GetTenant,
221
      CheckConnection,
222
      RegisterProcess
223
    ]
224

225
    case Piper.run(pipes, state) do
197✔
226
      {:ok, acc} ->
227
        {:noreply, acc, {:continue, :run_migrations}}
190✔
228

229
      {:error, :tenant_not_found} ->
230
        {:stop, {:shutdown, :tenant_not_found}, state}
×
231

232
      {:error, :tenant_db_too_many_connections} ->
233
        {:stop, {:shutdown, :tenant_db_too_many_connections}, state}
3✔
234

235
      {:error, error} ->
236
        log_error("UnableToConnectToTenantDatabase", error)
×
237
        {:stop, :shutdown, state}
×
238
    end
239
  end
240

241
  def handle_continue(:run_migrations, state) do
190✔
242
    %{tenant: tenant, db_conn_pid: db_conn_pid} = state
190✔
243
    Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}")
190✔
244

245
    with res when res in [:ok, :noop] <- Migrations.run_migrations(tenant),
190✔
246
         :ok <- Migrations.create_partitions(db_conn_pid) do
189✔
247
      {:noreply, state, {:continue, :start_replication}}
158✔
248
    else
249
      error ->
250
        log_error("MigrationsFailedToRun", error)
×
251
        {:stop, :shutdown, state}
×
252
    end
253
  rescue
254
    error ->
1✔
255
      log_error("MigrationsFailedToRun", error)
1✔
256
      {:stop, :shutdown, state}
1✔
257
  end
258

259
  def handle_continue(:start_replication, state) do
260
    case start_replication_connection(state) do
158✔
261
      {:ok, state} -> {:noreply, state, {:continue, :setup_connected_user_events}}
153✔
262
      {:error, state} -> {:stop, :shutdown, state}
1✔
263
    end
264
  end
265

266
  def handle_continue(:setup_connected_user_events, state) do
267
    %{
268
      check_connected_user_interval: check_connected_user_interval,
269
      connected_users_bucket: connected_users_bucket,
270
      tenant_id: tenant_id
271
    } = state
153✔
272

273
    :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
153✔
274
    send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)
153✔
275
    :ets.insert(__MODULE__, {tenant_id})
153✔
276
    {:noreply, state, {:continue, :start_connect_region_check}}
153✔
277
  end
278

279
  def handle_continue(:start_connect_region_check, state) do
280
    send_connect_region_check_message(state.check_connect_region_interval)
153✔
281
    {:noreply, state}
282
  end
283

284
  @impl GenServer
285
  def handle_info(
286
        :check_connected_users,
287
        %{
288
          tenant_id: tenant_id,
289
          check_connected_user_interval: check_connected_user_interval,
290
          connected_users_bucket: connected_users_bucket
291
        } = state
292
      ) do
293
    connected_users_bucket =
127✔
294
      tenant_id
295
      |> update_connected_users_bucket(connected_users_bucket)
296
      |> send_connected_user_check_message(check_connected_user_interval)
297

298
    {:noreply, %{state | connected_users_bucket: connected_users_bucket}}
299
  end
300

301
  def handle_info({:check_connect_region, previous_nodes_set}, state) do
302
    current_nodes_set = MapSet.new(Node.list())
5✔
303

304
    case Rebalancer.check(previous_nodes_set, current_nodes_set, state.tenant_id) do
5✔
305
      :ok ->
306
        # Let's check again in the future
307
        send_connect_region_check_message(state.check_connect_region_interval)
4✔
308
        {:noreply, state}
309

310
      {:error, :wrong_region} ->
311
        Logger.warning("Rebalancing Tenant database connection for a closer region")
1✔
312
        {:stop, {:shutdown, :rebalancing}, state}
1✔
313
    end
314
  end
315

316
  def handle_info(:shutdown_no_connected_users, state) do
317
    Logger.info("Tenant has no connected users, database connection will be terminated")
2✔
318
    {:stop, :shutdown, state}
2✔
319
  end
320

321
  def handle_info(:shutdown_connect, state) do
322
    Logger.warning("Shutdowning tenant connection")
22✔
323
    {:stop, :shutdown, state}
22✔
324
  end
325

326
  # Handle database connection termination
327
  def handle_info(
328
        {:DOWN, db_conn_reference, _, _, _},
329
        %{db_conn_reference: db_conn_reference} = state
330
      ) do
331
    Logger.warning("Database connection has been terminated")
×
332
    {:stop, :shutdown, state}
×
333
  end
334

335
  @replication_recovery_backoff 1000
336

337
  # Handle replication connection termination
338
  def handle_info(
339
        {:DOWN, replication_connection_reference, _, _, _},
340
        %{replication_connection_reference: replication_connection_reference} = state
341
      ) do
342
    log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
3✔
343
    Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
3✔
344
    state = %{state | replication_connection_pid: nil, replication_connection_reference: nil}
3✔
345
    {:noreply, state}
346
  end
347

348
  @replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
349
  def handle_info(:recover_replication_connection, state) do
350
    with %{num_rows: 0} <- Postgrex.query!(state.db_conn_pid, @replication_connection_query, []),
10✔
351
         {:ok, state} <- start_replication_connection(state) do
10✔
352
      {:noreply, state}
353
    else
354
      _ ->
355
        log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed")
7✔
356
        Process.send_after(self(), :recover_replication_connection, @replication_recovery_backoff)
7✔
357
        {:noreply, state}
358
    end
359
  end
360

361
  def handle_info(_, state), do: {:noreply, state}
6✔
362

363
  @impl true
364
  def handle_call(:ready?, _from, state) do
365
    # We just want to know if the process is ready to reply to the client
366
    # Essentially checking if all handle_continue's were completed
367
    {:reply, true, state}
127✔
368
  end
369

370
  @impl true
371
  def terminate(reason, %{tenant_id: tenant_id}) do
372
    Logger.info("Tenant #{tenant_id} has been terminated: #{inspect(reason)}")
30✔
373
    Realtime.MetricsCleaner.delete_metric(tenant_id)
30✔
374
    :ok
375
  end
376

377
  ## Private functions
378
  defp call_external_node(tenant_id, opts) do
379
    Logger.warning("Connection process starting up")
213✔
380
    rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default)
213✔
381

382
    with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
213✔
383
         :ok <- tenant_suspended?(tenant),
213✔
384
         {:ok, node, region} <- Realtime.Nodes.get_node_for_tenant(tenant) do
210✔
385
      Rpc.enhanced_call(node, __MODULE__, :connect, [tenant_id, region, opts],
209✔
386
        timeout: rpc_timeout,
387
        tenant_id: tenant_id
388
      )
389
    end
390
  end
391

392
  defp update_connected_users_bucket(tenant_id, connected_users_bucket) do
393
    connected_users_bucket
394
    |> then(&(&1 ++ [UsersCounter.tenant_users(tenant_id)]))
127✔
395
    |> Enum.take(-6)
127✔
396
  end
397

398
  defp send_connected_user_check_message(
399
         @connected_users_bucket_shutdown,
400
         check_connected_user_interval
401
       ) do
402
    Process.send_after(self(), :shutdown_no_connected_users, check_connected_user_interval)
2✔
403
  end
404

405
  defp send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) do
406
    Process.send_after(self(), :check_connected_users, check_connected_user_interval)
278✔
407
    connected_users_bucket
278✔
408
  end
409

410
  defp send_connect_region_check_message(check_connect_region_interval) do
411
    Process.send_after(self(), {:check_connect_region, MapSet.new(Node.list())}, check_connect_region_interval)
157✔
412
  end
413

414
  defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended}
3✔
415
  defp tenant_suspended?(_), do: :ok
210✔
416

417
  defp rebalance_check_interval_in_ms(), do: Application.fetch_env!(:realtime, :rebalance_check_interval_in_ms)
208✔
418

419
  defp start_replication_connection(state) do
168✔
420
    %{tenant: tenant} = state
168✔
421

422
    with {:ok, replication_connection_pid} <- ReplicationConnection.start(tenant, self()) do
168✔
423
      replication_connection_reference = Process.monitor(replication_connection_pid)
156✔
424

425
      state = %{
156✔
426
        state
427
        | replication_connection_pid: replication_connection_pid,
428
          replication_connection_reference: replication_connection_reference
429
      }
430

431
      {:ok, state}
432
    else
433
      {:error, :max_wal_senders_reached} ->
434
        log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders")
1✔
435
        {:error, state}
436

437
      {:error, error} ->
438
        log_error("StartReplicationFailed", error)
7✔
439
        {:error, state}
440
    end
441
  rescue
442
    error ->
×
443
      log_error("StartReplicationFailed", error)
×
444
      {:error, state}
445
  end
446
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc