• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / a15a1f3cd1cc1b2b9b3c7e748c28e7b0a6745084-PR-1466

23 Jul 2025 12:52AM UTC coverage: 85.292% (+0.1%) from 85.146%
a15a1f3cd1cc1b2b9b3c7e748c28e7b0a6745084-PR-1466

Pull #1466

github

edgurgel
fix: flaky test
Pull Request #1466: fix: wait for `Connect.lookup_or_start_connection` to be ready

11 of 12 new or added lines in 2 files covered. (91.67%)

6 existing lines in 4 files now uncovered.

2047 of 2400 relevant lines covered (85.29%)

1752.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.25
/lib/realtime/database.ex
1
defmodule Realtime.Database do
2
  @moduledoc """
3
  Handles tenant database operations
4
  """
5
  use Realtime.Logs
6

7
  alias Realtime.Api.Tenant
8
  alias Realtime.Crypto
9
  alias Realtime.PostgresCdc
10
  alias Realtime.Rpc
11
  alias Realtime.Telemetry
12

13
  defstruct [
14
    :hostname,
15
    :port,
16
    :database,
17
    :username,
18
    :password,
19
    :pool_size,
20
    :queue_target,
21
    :application_name,
22
    :max_restarts,
23
    :socket_options,
24
    ssl: true,
25
    backoff_type: :rand_exp
26
  ]
27

28
  @type t :: %__MODULE__{
29
          hostname: binary(),
30
          database: binary(),
31
          username: binary(),
32
          password: binary(),
33
          port: non_neg_integer(),
34
          pool_size: non_neg_integer(),
35
          queue_target: non_neg_integer(),
36
          application_name: binary(),
37
          max_restarts: non_neg_integer() | nil,
38
          ssl: boolean(),
39
          socket_options: list(),
40
          backoff_type: :stop | :exp | :rand | :rand_exp
41
        }
42

43
  @cdc "postgres_cdc_rls"
44
  @doc """
45
  Creates a database connection struct from the given tenant.
46
  """
47
  @spec from_tenant(Tenant.t(), binary(), :stop | :exp | :rand | :rand_exp) :: t()
48
  def from_tenant(%Tenant{} = tenant, application_name, backoff \\ :rand_exp) do
49
    tenant
50
    |> then(&Realtime.PostgresCdc.filter_settings(@cdc, &1.extensions))
1,581✔
51
    |> then(&from_settings(&1, application_name, backoff))
1,581✔
52
  end
53

54
  @doc """
55
  Creates a database connection struct from the given settings.
56
  """
57
  @spec from_settings(map(), binary(), :stop | :exp | :rand | :rand_exp) :: t()
58
  def from_settings(settings, application_name, backoff \\ :rand_exp) do
59
    pool = pool_size_by_application_name(application_name, settings)
2,660✔
60

61
    settings =
2,660✔
62
      settings
63
      |> Map.take(["db_host", "db_port", "db_name", "db_user", "db_password"])
64
      |> Enum.map(fn {k, v} -> {k, Crypto.decrypt!(v)} end)
13,300✔
65
      |> Map.new()
66
      |> then(&Map.merge(settings, &1))
2,660✔
67

68
    {:ok, addrtype} = detect_ip_version(settings["db_host"])
2,660✔
69
    ssl = if default_ssl_param(settings), do: [verify: :verify_none], else: false
2,648✔
70

71
    %__MODULE__{
2,648✔
72
      hostname: settings["db_host"],
73
      port: String.to_integer(settings["db_port"]),
74
      database: settings["db_name"],
75
      username: settings["db_user"],
76
      password: settings["db_password"],
77
      pool_size: pool,
78
      queue_target: settings["db_queue_target"] || 5_000,
2,648✔
79
      application_name: application_name,
80
      backoff_type: backoff,
81
      socket_options: [addrtype],
82
      ssl: ssl
83
    }
84
  end
85

86
  @available_connection_factor 0.95
87

88
  @doc """
89
  Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
90
  """
91

92
  @spec check_tenant_connection(Tenant.t() | nil) :: {:error, atom()} | {:ok, pid()}
93
  def check_tenant_connection(nil), do: {:error, :tenant_not_found}
×
94

95
  def check_tenant_connection(tenant) do
96
    tenant
97
    |> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
364✔
98
    |> then(fn settings ->
364✔
99
      required_pool = tenant_pool_requirements(settings)
364✔
100
      check_settings = from_settings(settings, "realtime_connect", :stop)
364✔
101
      check_settings = Map.put(check_settings, :max_restarts, 0)
364✔
102

103
      with {:ok, conn} <- connect_db(check_settings) do
364✔
104
        query =
364✔
105
          "select (current_setting('max_connections')::int - count(*))::int from pg_stat_activity where application_name != 'realtime_connect'"
106

107
        case Postgrex.query(conn, query, []) do
364✔
108
          {:ok, %{rows: [[available_connections]]}} ->
109
            requirement = ceil(required_pool * @available_connection_factor)
350✔
110

111
            if requirement < available_connections do
350✔
112
              {:ok, conn}
113
            else
114
              log_error(
6✔
115
                "DatabaseLackOfConnections",
116
                "Only #{available_connections} available connections. At least #{requirement} connections are required."
6✔
117
              )
118

119
              {:error, :tenant_db_too_many_connections}
120
            end
121

122
          {:error, e} ->
123
            Process.exit(conn, :kill)
×
124
            log_error("UnableToConnectToTenantDatabase", e)
×
125
            {:error, e}
126
        end
127
      end
128
    end)
129
  end
130

131
  @doc """
132
  Connects to the database using the given settings.
133
  """
134
  @spec connect(Tenant.t(), binary(), :stop | :exp | :rand | :rand_exp) ::
135
          {:ok, pid()} | {:error, any()}
136
  def connect(tenant, application_name, backoff \\ :stop) do
137
    tenant
138
    |> from_tenant(application_name, backoff)
139
    |> connect_db()
570✔
140
  end
141

142
  @doc """
143
  If the param `ssl_enforced` is not set, it defaults to true.
144
  """
145
  @spec default_ssl_param(map) :: boolean
146
  def default_ssl_param(%{"ssl_enforced" => ssl_enforced}) when is_boolean(ssl_enforced),
147
    do: ssl_enforced
2,648✔
148

149
  def default_ssl_param(_), do: true
×
150

151
  @doc """
152
  Runs database transaction in local node or against a target node withing a Postgrex transaction
153
  """
154
  @spec transaction(pid | DBConnection.t(), fun(), keyword(), keyword()) :: {:ok, any()} | {:error, any()}
155
  def transaction(db_conn, func, opts \\ [], metadata \\ [])
5,793✔
156

157
  def transaction(%DBConnection{} = db_conn, func, opts, metadata),
158
    do: transaction_catched(db_conn, func, opts, metadata)
×
159

160
  def transaction(db_conn, func, opts, metadata) when node() == node(db_conn),
161
    do: transaction_catched(db_conn, func, opts, metadata)
6,001✔
162

163
  def transaction(db_conn, func, opts, metadata) do
164
    metadata = Keyword.put(metadata, :target, node(db_conn))
8✔
165
    args = [db_conn, func, opts, metadata]
8✔
166

167
    case Rpc.enhanced_call(node(db_conn), __MODULE__, :transaction, args, metadata) do
8✔
168
      {:ok, value} -> {:ok, value}
2✔
169
      {:error, :rpc_error, error} -> {:error, error}
2✔
170
      {:error, error} -> {:error, error}
4✔
171
    end
172
  end
173

174
  defp transaction_catched(db_conn, func, opts, metadata) do
6,001✔
175
    telemetry = Keyword.get(opts, :telemetry, nil)
6,001✔
176

177
    if telemetry do
6,001✔
178
      tenant_id = Keyword.get(opts, :tenant_id, nil)
210✔
179
      {latency, value} = :timer.tc(Postgrex, :transaction, [db_conn, func, opts], :millisecond)
210✔
180
      Telemetry.execute(telemetry, %{latency: latency}, %{tenant: tenant_id})
206✔
181
      value
206✔
182
    else
183
      Postgrex.transaction(db_conn, func, opts)
5,791✔
184
    end
185
  rescue
186
    e ->
18✔
187
      log_error("ErrorExecutingTransaction", e, metadata)
18✔
188
      {:error, e}
189
  end
190

191
  @spec connect_db(__MODULE__.t()) :: {:ok, pid()} | {:error, any()}
192
  def connect_db(%__MODULE__{} = settings) do
193
    %__MODULE__{
194
      hostname: hostname,
195
      port: port,
196
      database: database,
197
      username: username,
198
      password: password,
199
      pool_size: pool_size,
200
      queue_target: queue_target,
201
      application_name: application_name,
202
      backoff_type: backoff_type,
203
      max_restarts: max_restarts,
204
      socket_options: socket_options,
205
      ssl: ssl
206
    } = settings
1,653✔
207

208
    metadata = Logger.metadata()
1,653✔
209

210
    [
211
      hostname: hostname,
212
      port: port,
213
      database: database,
214
      username: username,
215
      password: password,
216
      pool_size: pool_size,
217
      queue_target: queue_target,
218
      parameters: [application_name: application_name],
219
      socket_options: socket_options,
220
      backoff_type: backoff_type,
221
      ssl: ssl,
222
      configure: fn args ->
223
        metadata
224
        |> Keyword.put(:application_name, application_name)
225
        |> Logger.metadata()
2,421✔
226

227
        args
2,421✔
228
      end
229
    ]
230
    |> then(fn opts ->
231
      if max_restarts, do: Keyword.put(opts, :max_restarts, max_restarts), else: opts
1,653✔
232
    end)
233
    |> Postgrex.start_link()
1,653✔
234
  end
235

236
  @doc """
237
  Returns the pool size for a given application name. Override pool size if provided.
238

239
  `realtime_rls` and `realtime_broadcast_changes` will be handled as a special scenario as it will need to be hardcoded as 1 otherwise replication slots will be tried to be reused leading to errors
240
  `realtime_migrations` will be handled as a special scenario as it requires 2 connections.
241
  """
242
  @spec pool_size_by_application_name(binary(), map() | nil) :: non_neg_integer()
243
  def pool_size_by_application_name(application_name, settings) do
244
    case application_name do
6,324✔
245
      "realtime_subscription_manager" -> settings["subcriber_pool_size"] || 1
406✔
246
      "realtime_subscription_manager_pub" -> settings["subs_pool_size"] || 1
403✔
247
      "realtime_subscription_checker" -> settings["subs_pool_size"] || 1
406✔
248
      "realtime_connect" -> settings["db_pool"] || 1
738✔
249
      "realtime_health_check" -> 1
370✔
250
      "realtime_janitor" -> 1
398✔
251
      "realtime_migrations" -> 2
928✔
252
      "realtime_broadcast_changes" -> 1
761✔
253
      "realtime_rls" -> 1
418✔
254
      "realtime_replication_slot_teardown" -> 1
372✔
255
      _ -> 1
1,124✔
256
    end
257
  end
258

259
  @doc """
260
  Gets the external id from a host connection string found in the conn.
261
  """
262
  @spec get_external_id(String.t()) :: {:ok, String.t()} | {:error, atom()}
263
  def get_external_id(host) when is_binary(host) do
264
    case String.split(host, ".", parts: 2) do
218✔
265
      [id] -> {:ok, id}
146✔
266
      [id, _] -> {:ok, id}
72✔
267
    end
268
  end
269

270
  @doc """
271
  Detects the IP version for a given host.
272
  """
273
  @spec detect_ip_version(String.t()) :: {:ok, :inet | :inet6} | {:error, :nxdomain}
274
  def detect_ip_version(host) when is_binary(host) do
275
    host = String.to_charlist(host)
2,672✔
276

277
    cond do
2,672✔
278
      match?({:ok, _}, :inet6_tcp.getaddr(host)) -> {:ok, :inet6}
2,669✔
279
      match?({:ok, _}, :inet.gethostbyname(host)) -> {:ok, :inet}
2,665✔
280
      true -> {:error, :nxdomain}
2✔
281
    end
282
  end
283

284
  @doc """
285
  Terminates all replication slots with the name containing 'realtime' in the tenant database.
286
  """
287
  @spec replication_slot_teardown(Tenant.t()) :: :ok
288
  def replication_slot_teardown(tenant) do
289
    {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")
6✔
290

291
    query =
6✔
292
      "select slot_name from pg_replication_slots where slot_name like '%realtime%'"
293

294
    with {:ok, %{rows: [rows]}} <- Postgrex.query(conn, query, []) do
6✔
295
      rows
UNCOV
296
      |> Enum.reject(&is_nil/1)
1✔
UNCOV
297
      |> Enum.each(&replication_slot_teardown(conn, &1))
1✔
298
    end
299

300
    GenServer.stop(conn)
6✔
301
    :ok
302
  end
303

304
  @doc """
305
  Terminates replication slot with a given name in the tenant database.
306
  """
307
  @spec replication_slot_teardown(pid() | Tenant.t(), String.t()) :: :ok
308
  def replication_slot_teardown(%Tenant{} = tenant, slot_name) do
309
    {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")
2✔
310
    replication_slot_teardown(conn, slot_name)
2✔
311
    :ok
312
  end
313

314
  def replication_slot_teardown(conn, slot_name) do
315
    Postgrex.query(
5✔
316
      conn,
317
      "select active_pid, pg_terminate_backend(active_pid), pg_drop_replication_slot(slot_name) from pg_replication_slots where slot_name = $1",
318
      [slot_name]
319
    )
320

321
    Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
5✔
322
    :ok
323
  end
324

325
  @doc """
326
  Transforms database settings into keyword list to be used by Postgrex.
327
  ## Examples
328

329
  iex> Database.opts(%Database{hostname: "localhost", port: 5432, database: "realtime", username: "postgres", password: "postgres", application_name: "test", backoff_type: :stop, pool_size: 10, queue_target: 10_000, socket_options: [:inet], ssl: true}) |> Enum.sort()
330
  [
331
    application_name: "test",
332
    backoff_type: :stop,
333
    database: "realtime",
334
    hostname: "localhost",
335
    max_restarts: nil,
336
    password: "postgres",
337
    pool_size: 10,
338
    port: 5432,
339
    queue_target: 10000,
340
    socket_options: [:inet],
341
    ssl: true,
342
    username: "postgres"
343
  ]
344
  """
345

346
  @spec opts(__MODULE__.t()) :: keyword()
347
  def opts(%__MODULE__{} = settings) do
348
    settings
349
    |> Map.from_struct()
350
    |> Map.to_list()
351
    |> Keyword.new()
8✔
352
  end
353

354
  defp tenant_pool_requirements(settings) do
355
    application_names = [
364✔
356
      "realtime_subscription_manager",
357
      "realtime_subscription_manager_pub",
358
      "realtime_subscription_checker",
359
      "realtime_health_check",
360
      "realtime_janitor",
361
      "realtime_migrations",
362
      "realtime_broadcast_changes",
363
      "realtime_rls",
364
      "realtime_replication_slot_teardown",
365
      "realtime_connect"
366
    ]
367

368
    Enum.reduce(application_names, 0, fn application_name, acc ->
364✔
369
      acc + pool_size_by_application_name(application_name, settings)
3,640✔
370
    end)
371
  end
372
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc