• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / d770551fe2a39bcb84c7c7b7353732e1083b80b9-PR-1508

01 Sep 2025 12:50AM UTC coverage: 84.486% (+0.3%) from 84.137%
d770551fe2a39bcb84c7c7b7353732e1083b80b9-PR-1508

Pull #1508

github

edgurgel
chore: fix dialyzer

Databaase.pool_size_by_application_name always returns a number
Pull Request #1508: feat: rate counter for failing auth due to PG connection errors

44 of 47 new or added lines in 5 files covered. (93.62%)

1 existing line in 1 file now uncovered.

2015 of 2385 relevant lines covered (84.49%)

2791.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.18
/lib/realtime/database.ex
1
defmodule Realtime.Database do
2
  @moduledoc """
3
  Handles tenant database operations
4
  """
5
  use Realtime.Logs
6

7
  alias Realtime.Api.Tenant
8
  alias Realtime.Crypto
9
  alias Realtime.PostgresCdc
10
  alias Realtime.Rpc
11
  alias Realtime.Telemetry
12

13
  defstruct [
14
    :hostname,
15
    :port,
16
    :database,
17
    :username,
18
    :password,
19
    :pool_size,
20
    :queue_target,
21
    :application_name,
22
    :max_restarts,
23
    :socket_options,
24
    ssl: true,
25
    backoff_type: :rand_exp
26
  ]
27

28
  @type t :: %__MODULE__{
29
          hostname: binary(),
30
          database: binary(),
31
          username: binary(),
32
          password: binary(),
33
          port: non_neg_integer(),
34
          pool_size: non_neg_integer(),
35
          queue_target: non_neg_integer(),
36
          application_name: binary(),
37
          max_restarts: non_neg_integer() | nil,
38
          ssl: boolean(),
39
          socket_options: list(),
40
          backoff_type: :stop | :exp | :rand | :rand_exp
41
        }
42

43
  @cdc "postgres_cdc_rls"
44
  @doc """
45
  Creates a database connection struct from the given tenant.
46
  """
47
  @spec from_tenant(Tenant.t(), binary(), :stop | :exp | :rand | :rand_exp) :: t()
48
  def from_tenant(%Tenant{} = tenant, application_name, backoff \\ :rand_exp) do
49
    tenant
50
    |> then(&Realtime.PostgresCdc.filter_settings(@cdc, &1.extensions))
860✔
51
    |> then(&from_settings(&1, application_name, backoff))
860✔
52
  end
53

54
  @doc """
55
  Creates a database connection struct from the given settings.
56
  """
57
  @spec from_settings(map(), binary(), :stop | :exp | :rand | :rand_exp) :: t()
58
  def from_settings(settings, application_name, backoff \\ :rand_exp) do
59
    pool = pool_size_by_application_name(application_name, settings)
1,563✔
60

61
    settings =
1,563✔
62
      settings
63
      |> Map.take(["db_host", "db_port", "db_name", "db_user", "db_password"])
64
      |> Enum.map(fn {k, v} -> {k, Crypto.decrypt!(v)} end)
7,815✔
65
      |> Map.new()
66
      |> then(&Map.merge(settings, &1))
1,563✔
67

68
    {:ok, addrtype} = detect_ip_version(settings["db_host"])
1,563✔
69
    ssl = if default_ssl_param(settings), do: [verify: :verify_none], else: false
1,560✔
70

71
    %__MODULE__{
1,560✔
72
      hostname: settings["db_host"],
73
      port: String.to_integer(settings["db_port"]),
74
      database: settings["db_name"],
75
      username: settings["db_user"],
76
      password: settings["db_password"],
77
      pool_size: pool,
78
      queue_target: settings["db_queue_target"] || 5_000,
1,560✔
79
      application_name: application_name,
80
      backoff_type: backoff,
81
      socket_options: [addrtype],
82
      ssl: ssl
83
    }
84
  end
85

86
  @available_connection_factor 0.95
87

88
  @doc """
89
  Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
90
  """
91

92
  @spec check_tenant_connection(Tenant.t() | nil) :: {:error, atom()} | {:ok, pid()}
93
  def check_tenant_connection(nil), do: {:error, :tenant_not_found}
×
94

95
  def check_tenant_connection(tenant) do
96
    tenant
97
    |> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
192✔
98
    |> then(fn settings ->
192✔
99
      required_pool = tenant_pool_requirements(settings)
192✔
100
      check_settings = from_settings(settings, "realtime_connect", :stop)
192✔
101
      check_settings = Map.put(check_settings, :max_restarts, 0)
192✔
102

103
      with {:ok, conn} <- connect_db(check_settings) do
192✔
104
        query =
192✔
105
          "select (current_setting('max_connections')::int - count(*))::int from pg_stat_activity where application_name != 'realtime_connect'"
106

107
        case Postgrex.query(conn, query, []) do
192✔
108
          {:ok, %{rows: [[available_connections]]}} ->
109
            requirement = ceil(required_pool * @available_connection_factor)
182✔
110

111
            if requirement < available_connections do
182✔
112
              {:ok, conn}
113
            else
114
              log_error(
3✔
115
                "DatabaseLackOfConnections",
116
                "Only #{available_connections} available connections. At least #{requirement} connections are required."
3✔
117
              )
118

119
              {:error, :tenant_db_too_many_connections}
120
            end
121

122
          {:error, e} ->
123
            Process.exit(conn, :kill)
×
124
            log_error("UnableToConnectToTenantDatabase", e)
×
125
            {:error, e}
126
        end
127
      end
128
    end)
129
  end
130

131
  @doc """
132
  Connects to the database using the given settings.
133
  """
134
  @spec connect(Tenant.t(), binary(), :stop | :exp | :rand | :rand_exp) ::
135
          {:ok, pid()} | {:error, any()}
136
  def connect(tenant, application_name, backoff \\ :stop) do
137
    tenant
138
    |> from_tenant(application_name, backoff)
139
    |> connect_db()
287✔
140
  end
141

142
  @doc """
143
  If the param `ssl_enforced` is not set, it defaults to true.
144
  """
145
  @spec default_ssl_param(map) :: boolean
146
  def default_ssl_param(%{"ssl_enforced" => ssl_enforced}) when is_boolean(ssl_enforced),
147
    do: ssl_enforced
1,560✔
148

149
  def default_ssl_param(_), do: true
×
150

151
  @doc """
152
  Runs database transaction in local node or against a target node withing a Postgrex transaction
153
  """
154
  @spec transaction(pid | DBConnection.t(), fun(), keyword(), keyword()) :: {:ok, any()} | {:error, any()}
155
  def transaction(db_conn, func, opts \\ [], metadata \\ [])
3,428✔
156

157
  def transaction(%DBConnection{} = db_conn, func, opts, metadata),
158
    do: transaction_catched(db_conn, func, opts, metadata)
×
159

160
  def transaction(db_conn, func, opts, metadata) when node() == node(db_conn),
161
    do: transaction_catched(db_conn, func, opts, metadata)
3,963✔
162

163
  def transaction(db_conn, func, opts, metadata) do
164
    metadata = Keyword.put(metadata, :target, node(db_conn))
4✔
165
    args = [db_conn, func, opts, metadata]
4✔
166

167
    case Rpc.enhanced_call(node(db_conn), __MODULE__, :transaction, args, metadata) do
4✔
168
      {:ok, value} -> {:ok, value}
1✔
UNCOV
169
      {:error, :rpc_error, error} -> {:error, error}
×
170
      {:error, error} -> {:error, error}
3✔
171
    end
172
  end
173

174
  defp transaction_catched(db_conn, func, opts, metadata) do
3,963✔
175
    telemetry = Keyword.get(opts, :telemetry, nil)
3,963✔
176

177
    if telemetry do
3,963✔
178
      tenant_id = Keyword.get(opts, :tenant_id, nil)
536✔
179
      {latency, value} = :timer.tc(Postgrex, :transaction, [db_conn, func, opts], :millisecond)
536✔
180
      Telemetry.execute(telemetry, %{latency: latency}, %{tenant: tenant_id})
413✔
181
      value
413✔
182
    else
183
      Postgrex.transaction(db_conn, func, opts)
3,427✔
184
    end
185
  rescue
186
    e ->
110✔
187
      log_error("ErrorExecutingTransaction", e, metadata)
110✔
188
      {:error, e}
189
  catch
190
    :exit, reason ->
191
      log_error("ErrorExecutingTransaction", reason, metadata)
20✔
192
      {:error, {:exit, reason}}
193
  end
194

195
  @spec connect_db(__MODULE__.t()) :: {:ok, pid()} | {:error, any()}
196
  def connect_db(%__MODULE__{} = settings) do
197
    %__MODULE__{
198
      hostname: hostname,
199
      port: port,
200
      database: database,
201
      username: username,
202
      password: password,
203
      pool_size: pool_size,
204
      queue_target: queue_target,
205
      application_name: application_name,
206
      backoff_type: backoff_type,
207
      max_restarts: max_restarts,
208
      socket_options: socket_options,
209
      ssl: ssl
210
    } = settings
989✔
211

212
    metadata = Logger.metadata()
989✔
213

214
    [
215
      hostname: hostname,
216
      port: port,
217
      database: database,
218
      username: username,
219
      password: password,
220
      pool_size: pool_size,
221
      queue_target: queue_target,
222
      parameters: [application_name: application_name],
223
      socket_options: socket_options,
224
      backoff_type: backoff_type,
225
      ssl: ssl,
226
      configure: fn args ->
227
        metadata
228
        |> Keyword.put(:application_name, application_name)
229
        |> Logger.metadata()
1,472✔
230

231
        args
1,472✔
232
      end
233
    ]
234
    |> then(fn opts ->
235
      if max_restarts, do: Keyword.put(opts, :max_restarts, max_restarts), else: opts
989✔
236
    end)
237
    |> Postgrex.start_link()
989✔
238
  end
239

240
  @doc """
241
  Returns the pool size for a given application name. Override pool size if provided.
242

243
  `realtime_rls` and `realtime_broadcast_changes` will be handled as a special scenario as it will need to be hardcoded as 1 otherwise replication slots will be tried to be reused leading to errors
244
  `realtime_migrations` will be handled as a special scenario as it requires 2 connections.
245
  """
246
  @spec pool_size_by_application_name(binary(), map() | nil) :: non_neg_integer()
247
  def pool_size_by_application_name(application_name, settings) do
248
    case application_name do
4,094✔
249
      "realtime_subscription_manager" -> settings["subcriber_pool_size"] || 1
223✔
250
      "realtime_subscription_manager_pub" -> settings["subs_pool_size"] || 1
223✔
251
      "realtime_subscription_checker" -> settings["subs_pool_size"] || 1
223✔
252
      "realtime_connect" -> settings["db_pool"] || 1
988✔
253
      "realtime_health_check" -> 1
195✔
254
      "realtime_janitor" -> 1
210✔
255
      "realtime_migrations" -> 2
579✔
256
      "realtime_broadcast_changes" -> 1
354✔
257
      "realtime_rls" -> 1
236✔
258
      "realtime_replication_slot_teardown" -> 1
196✔
259
      _ -> 1
667✔
260
    end
261
  end
262

263
  @doc """
264
  Gets the external id from a host connection string found in the conn.
265
  """
266
  @spec get_external_id(String.t()) :: {:ok, String.t()} | {:error, atom()}
267
  def get_external_id(host) when is_binary(host) do
268
    case String.split(host, ".", parts: 2) do
144✔
269
      [id] -> {:ok, id}
85✔
270
      [id, _] -> {:ok, id}
59✔
271
    end
272
  end
273

274
  @doc """
275
  Detects the IP version for a given host.
276
  """
277
  @spec detect_ip_version(String.t()) :: {:ok, :inet | :inet6} | {:error, :nxdomain}
278
  def detect_ip_version(host) when is_binary(host) do
279
    host = String.to_charlist(host)
1,569✔
280

281
    cond do
1,569✔
282
      match?({:ok, _}, :inet6_tcp.getaddr(host)) -> {:ok, :inet6}
1,569✔
283
      match?({:ok, _}, :inet.gethostbyname(host)) -> {:ok, :inet}
1,567✔
284
      true -> {:error, :nxdomain}
1✔
285
    end
286
  end
287

288
  @doc """
289
  Terminates all replication slots with the name containing 'realtime' in the tenant database.
290
  """
291
  @spec replication_slot_teardown(Tenant.t()) :: :ok
292
  def replication_slot_teardown(tenant) do
293
    {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")
3✔
294

295
    query =
3✔
296
      "select slot_name from pg_replication_slots where slot_name like '%realtime%'"
297

298
    with {:ok, %{rows: [rows]}} <- Postgrex.query(conn, query, []) do
3✔
299
      rows
300
      |> Enum.reject(&is_nil/1)
1✔
301
      |> Enum.each(&replication_slot_teardown(conn, &1))
1✔
302
    end
303

304
    GenServer.stop(conn)
3✔
305
    :ok
306
  end
307

308
  @doc """
309
  Terminates replication slot with a given name in the tenant database.
310
  """
311
  @spec replication_slot_teardown(pid() | Tenant.t(), String.t()) :: :ok
312
  def replication_slot_teardown(%Tenant{} = tenant, slot_name) do
313
    {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")
1✔
314
    replication_slot_teardown(conn, slot_name)
1✔
315
    :ok
316
  end
317

318
  def replication_slot_teardown(conn, slot_name) do
319
    Postgrex.query(
3✔
320
      conn,
321
      "select active_pid, pg_terminate_backend(active_pid), pg_drop_replication_slot(slot_name) from pg_replication_slots where slot_name = $1",
322
      [slot_name]
323
    )
324

325
    Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
3✔
326
    :ok
327
  end
328

329
  @doc """
330
  Transforms database settings into keyword list to be used by Postgrex.
331
  ## Examples
332

333
  iex> Database.opts(%Database{hostname: "localhost", port: 5432, database: "realtime", username: "postgres", password: "postgres", application_name: "test", backoff_type: :stop, pool_size: 10, queue_target: 10_000, socket_options: [:inet], ssl: true}) |> Enum.sort()
334
  [
335
    application_name: "test",
336
    backoff_type: :stop,
337
    database: "realtime",
338
    hostname: "localhost",
339
    max_restarts: nil,
340
    password: "postgres",
341
    pool_size: 10,
342
    port: 5432,
343
    queue_target: 10000,
344
    socket_options: [:inet],
345
    ssl: true,
346
    username: "postgres"
347
  ]
348
  """
349

350
  @spec opts(__MODULE__.t()) :: keyword()
351
  def opts(%__MODULE__{} = settings) do
352
    settings
353
    |> Map.from_struct()
354
    |> Map.to_list()
355
    |> Keyword.new()
3✔
356
  end
357

358
  defp tenant_pool_requirements(settings) do
359
    application_names = [
192✔
360
      "realtime_subscription_manager",
361
      "realtime_subscription_manager_pub",
362
      "realtime_subscription_checker",
363
      "realtime_health_check",
364
      "realtime_janitor",
365
      "realtime_migrations",
366
      "realtime_broadcast_changes",
367
      "realtime_rls",
368
      "realtime_replication_slot_teardown",
369
      "realtime_connect"
370
    ]
371

372
    Enum.reduce(application_names, 0, fn application_name, acc ->
192✔
373
      acc + pool_size_by_application_name(application_name, settings)
1,920✔
374
    end)
375
  end
376
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc