• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 1ade961f479f27895cfee27bf5fa17f58a8aa15f-PR-1403

05 Jun 2025 02:12PM UTC coverage: 83.763% (+0.7%) from 83.027%
1ade961f479f27895cfee27bf5fa17f58a8aa15f-PR-1403

Pull #1403

github

filipecabaco
fix tests
Pull Request #1403: fix: handle tuple errors

5 of 9 new or added lines in 2 files covered. (55.56%)

7 existing lines in 3 files now uncovered.

1821 of 2174 relevant lines covered (83.76%)

4367.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.18
/lib/realtime/database.ex
1
defmodule Realtime.Database do
2
  @moduledoc """
3
  Handles tenant database operations
4
  """
5
  use Realtime.Logs
6

7
  alias Realtime.Api.Tenant
8
  alias Realtime.Crypto
9
  alias Realtime.PostgresCdc
10
  alias Realtime.Rpc
11
  alias Realtime.Telemetry
12

13
  defstruct [
14
    :hostname,
15
    :port,
16
    :database,
17
    :username,
18
    :password,
19
    :pool_size,
20
    :queue_target,
21
    :application_name,
22
    :max_restarts,
23
    :socket_options,
24
    ssl: true,
25
    backoff_type: :rand_exp
26
  ]
27

28
  @type t :: %__MODULE__{
29
          hostname: binary(),
30
          database: binary(),
31
          username: binary(),
32
          password: binary(),
33
          port: non_neg_integer(),
34
          pool_size: non_neg_integer(),
35
          queue_target: non_neg_integer(),
36
          application_name: binary(),
37
          max_restarts: non_neg_integer() | nil,
38
          ssl: boolean(),
39
          socket_options: list(),
40
          backoff_type: :stop | :exp | :rand | :rand_exp
41
        }
42

43
  @cdc "postgres_cdc_rls"
44
  @doc """
45
  Creates a database connection struct from the given tenant.
46
  """
47
  @spec from_tenant(Tenant.t(), binary(), :stop | :exp | :rand | :rand_exp) :: t()
48
  def from_tenant(%Tenant{} = tenant, application_name, backoff \\ :rand_exp) do
49
    tenant
50
    |> then(&Realtime.PostgresCdc.filter_settings(@cdc, &1.extensions))
1,571✔
51
    |> then(&from_settings(&1, application_name, backoff))
1,571✔
52
  end
53

54
  @doc """
55
  Creates a database connection struct from the given settings.
56
  """
57
  @spec from_settings(map(), binary(), :stop | :exp | :rand | :rand_exp) :: t()
58
  def from_settings(settings, application_name, backoff \\ :rand_exp) do
59
    pool = pool_size_by_application_name(application_name, settings)
2,283✔
60

61
    settings =
2,283✔
62
      settings
63
      |> Map.take(["db_host", "db_port", "db_name", "db_user", "db_password"])
64
      |> Enum.map(fn {k, v} -> {k, Crypto.decrypt!(v)} end)
11,415✔
65
      |> Map.new()
66
      |> then(&Map.merge(settings, &1))
2,283✔
67

68
    {:ok, addrtype} = detect_ip_version(settings["db_host"])
2,283✔
69
    ssl = if default_ssl_param(settings), do: [verify: :verify_none], else: false
2,279✔
70

71
    %__MODULE__{
2,279✔
72
      hostname: settings["db_host"],
73
      port: String.to_integer(settings["db_port"]),
74
      database: settings["db_name"],
75
      username: settings["db_user"],
76
      password: settings["db_password"],
77
      pool_size: pool,
78
      queue_target: settings["db_queue_target"] || 5_000,
2,279✔
79
      application_name: application_name,
80
      backoff_type: backoff,
81
      socket_options: [addrtype],
82
      ssl: ssl
83
    }
84
  end
85

86
  @available_connection_factor 0.95
87

88
  @doc """
89
  Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
90
  """
91

92
  @spec check_tenant_connection(Tenant.t() | nil) :: {:error, atom()} | {:ok, pid()}
93
  def check_tenant_connection(nil), do: {:error, :tenant_not_found}
×
94

95
  def check_tenant_connection(tenant) do
96
    tenant
97
    |> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
232✔
98
    |> then(fn settings ->
232✔
99
      required_pool = tenant_pool_requirements(settings)
232✔
100
      check_settings = from_settings(settings, "realtime_connect", :stop)
232✔
101
      check_settings = Map.put(check_settings, :max_restarts, 0)
232✔
102

103
      with {:ok, conn} <- connect_db(check_settings) do
232✔
104
        query =
232✔
105
          "select (current_setting('max_connections')::int - count(*))::int from pg_stat_activity where application_name != 'realtime_connect'"
106

107
        case Postgrex.query(conn, query, []) do
232✔
108
          {:ok, %{rows: [[available_connections]]}} ->
109
            requirement = ceil(required_pool * @available_connection_factor)
215✔
110

111
            if requirement < available_connections do
215✔
112
              {:ok, conn}
113
            else
114
              log_error(
6✔
115
                "DatabaseLackOfConnections",
116
                "Only #{available_connections} available connections. At least #{requirement} connections are required."
6✔
117
              )
118

119
              {:error, :tenant_db_too_many_connections}
120
            end
121

122
          {:error, e} ->
123
            Process.exit(conn, :kill)
×
124
            log_error("UnableToConnectToTenantDatabase", e)
×
125
            {:error, e}
126
        end
127
      end
128
    end)
129
  end
130

131
  @doc """
132
  Connects to the database using the given settings.
133
  """
134
  @spec connect(Tenant.t(), binary(), :stop | :exp | :rand | :rand_exp) ::
135
          {:ok, pid()} | {:error, any()}
136
  def connect(tenant, application_name, backoff \\ :stop) do
137
    tenant
138
    |> from_tenant(application_name, backoff)
139
    |> connect_db()
556✔
140
  end
141

142
  @doc """
143
  If the param `ssl_enforced` is not set, it defaults to true.
144
  """
145
  @spec default_ssl_param(map) :: boolean
146
  def default_ssl_param(%{"ssl_enforced" => ssl_enforced}) when is_boolean(ssl_enforced),
147
    do: ssl_enforced
2,279✔
148

149
  def default_ssl_param(_), do: true
×
150

151
  @doc """
152
  Runs database transaction in local node or against a target node withing a Postgrex transaction
153
  """
154
  @spec transaction(pid | DBConnection.t(), fun(), keyword(), keyword()) :: {:ok, any()} | {:error, any()}
155
  def transaction(db_conn, func, opts \\ [], metadata \\ [])
5,105✔
156

157
  def transaction(%DBConnection{} = db_conn, func, opts, metadata),
158
    do: transaction_catched(db_conn, func, opts, metadata)
×
159

160
  def transaction(db_conn, func, opts, metadata) when node() == node(db_conn),
161
    do: transaction_catched(db_conn, func, opts, metadata)
5,104✔
162

163
  def transaction(db_conn, func, opts, metadata) do
UNCOV
164
    metadata = Keyword.put(metadata, :target, node(db_conn))
3✔
UNCOV
165
    args = [db_conn, func, opts, metadata]
3✔
166

UNCOV
167
    case Rpc.enhanced_call(node(db_conn), __MODULE__, :transaction, args) do
3✔
UNCOV
168
      {:ok, value} -> {:ok, value}
1✔
169
      {:error, :rpc_error, error} -> {:error, error}
×
UNCOV
170
      {:error, error} -> {:error, error}
2✔
171
    end
172
  end
173

174
  defp transaction_catched(db_conn, func, opts, metadata) do
5,104✔
175
    telemetry = Keyword.get(opts, :telemetry, nil)
5,104✔
176

177
    if telemetry do
5,104✔
178
      tenant_id = Keyword.get(opts, :tenant_id, nil)
170✔
179
      {latency, value} = :timer.tc(Postgrex, :transaction, [db_conn, func, opts], :millisecond)
170✔
180
      Telemetry.execute(telemetry, %{latency: latency}, %{tenant_id: tenant_id})
160✔
181
      value
160✔
182
    else
183
      Postgrex.transaction(db_conn, func, opts)
4,934✔
184
    end
185
  rescue
186
    e ->
26✔
187
      log_error("ErrorExecutingTransaction", e, metadata)
26✔
188
      {:error, e}
189
  end
190

191
  @spec connect_db(__MODULE__.t()) :: {:ok, pid()} | {:error, any()}
192
  def connect_db(%__MODULE__{} = settings) do
193
    %__MODULE__{
194
      hostname: hostname,
195
      port: port,
196
      database: database,
197
      username: username,
198
      password: password,
199
      pool_size: pool_size,
200
      queue_target: queue_target,
201
      application_name: application_name,
202
      backoff_type: backoff_type,
203
      max_restarts: max_restarts,
204
      socket_options: socket_options,
205
      ssl: ssl
206
    } = settings
1,284✔
207

208
    Logger.metadata(application_name: application_name)
1,284✔
209
    metadata = Logger.metadata()
1,284✔
210

211
    [
212
      hostname: hostname,
213
      port: port,
214
      database: database,
215
      username: username,
216
      password: password,
217
      pool_size: pool_size,
218
      queue_target: queue_target,
219
      parameters: [application_name: application_name],
220
      socket_options: socket_options,
221
      backoff_type: backoff_type,
222
      ssl: ssl,
223
      configure: fn args ->
224
        Logger.metadata(metadata)
3,193✔
225
        args
3,193✔
226
      end
227
    ]
228
    |> then(fn opts ->
229
      if max_restarts, do: Keyword.put(opts, :max_restarts, max_restarts), else: opts
1,284✔
230
    end)
231
    |> Postgrex.start_link()
1,284✔
232
  end
233

234
  @doc """
235
  Returns the pool size for a given application name. Override pool size if provided.
236

237
  `realtime_rls` and `realtime_broadcast_changes` will be handled as a special scenario as it will need to be hardcoded as 1 otherwise replication slots will be tried to be reused leading to errors
238
  `realtime_migrations` will be handled as a special scenario as it requires 2 connections.
239
  """
240
  @spec pool_size_by_application_name(binary(), map() | nil) :: non_neg_integer()
241
  def pool_size_by_application_name(application_name, settings) do
242
    case application_name do
4,627✔
243
      "realtime_subscription_manager" -> settings["subcriber_pool_size"] || 1
245✔
244
      "realtime_subscription_manager_pub" -> settings["subs_pool_size"] || 1
245✔
245
      "realtime_subscription_checker" -> settings["subs_pool_size"] || 1
245✔
246
      "realtime_connect" -> settings["db_pool"] || 1
474✔
247
      "realtime_health_check" -> 1
238✔
248
      "realtime_janitor" -> 1
274✔
249
      "realtime_migrations" -> 2
660✔
250
      "realtime_broadcast_changes" -> 1
540✔
251
      "realtime_rls" -> 1
271✔
252
      "realtime_replication_slot_teardown" -> 1
240✔
253
      _ -> 1
1,195✔
254
    end
255
  end
256

257
  @doc """
258
  Gets the external id from a host connection string found in the conn.
259
  """
260
  @spec get_external_id(String.t()) :: {:ok, String.t()} | {:error, atom()}
261
  def get_external_id(host) when is_binary(host) do
262
    case String.split(host, ".", parts: 2) do
168✔
263
      [id] -> {:ok, id}
106✔
264
      [id, _] -> {:ok, id}
62✔
265
    end
266
  end
267

268
  @doc """
269
  Detects the IP version for a given host.
270
  """
271
  @spec detect_ip_version(String.t()) :: {:ok, :inet | :inet6} | {:error, :nxdomain}
272
  def detect_ip_version(host) when is_binary(host) do
273
    host = String.to_charlist(host)
2,295✔
274

275
    cond do
2,295✔
276
      match?({:ok, _}, :inet6_tcp.getaddr(host)) -> {:ok, :inet6}
2,293✔
277
      match?({:ok, _}, :inet.gethostbyname(host)) -> {:ok, :inet}
2,289✔
278
      true -> {:error, :nxdomain}
2✔
279
    end
280
  end
281

282
  @doc """
283
  Terminates all replication slots with the name containing 'realtime' in the tenant database.
284
  """
285
  @spec replication_slot_teardown(Tenant.t()) :: :ok
286
  def replication_slot_teardown(tenant) do
287
    {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")
6✔
288

289
    query =
6✔
290
      "select slot_name from pg_replication_slots where slot_name like '%realtime%'"
291

292
    with {:ok, %{rows: [rows]}} <- Postgrex.query(conn, query, []) do
6✔
293
      rows
294
      |> Enum.reject(&is_nil/1)
2✔
295
      |> Enum.each(&replication_slot_teardown(conn, &1))
2✔
296
    end
297

298
    GenServer.stop(conn)
6✔
299
    :ok
300
  end
301

302
  @doc """
303
  Terminates replication slot with a given name in the tenant database.
304
  """
305
  @spec replication_slot_teardown(pid() | Tenant.t(), String.t()) :: :ok
306
  def replication_slot_teardown(%Tenant{} = tenant, slot_name) do
307
    {:ok, conn} = connect(tenant, "realtime_replication_slot_teardown")
2✔
308
    replication_slot_teardown(conn, slot_name)
2✔
309
    :ok
310
  end
311

312
  def replication_slot_teardown(conn, slot_name) do
313
    Postgrex.query(
6✔
314
      conn,
315
      "select active_pid, pg_terminate_backend(active_pid), pg_drop_replication_slot(slot_name) from pg_replication_slots where slot_name = $1",
316
      [slot_name]
317
    )
318

319
    Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
6✔
320
    :ok
321
  end
322

323
  @doc """
324
  Transforms database settings into keyword list to be used by Postgrex.
325
  ## Examples
326

327
  iex> Database.opts(%Database{hostname: "localhost", port: 5432, database: "realtime", username: "postgres", password: "postgres", application_name: "test", backoff_type: :stop, pool_size: 10, queue_target: 10_000, socket_options: [:inet], ssl: true}) |> Enum.sort()
328
  [
329
    application_name: "test",
330
    backoff_type: :stop,
331
    database: "realtime",
332
    hostname: "localhost",
333
    max_restarts: nil,
334
    password: "postgres",
335
    pool_size: 10,
336
    port: 5432,
337
    queue_target: 10000,
338
    socket_options: [:inet],
339
    ssl: true,
340
    username: "postgres"
341
  ]
342
  """
343

344
  @spec opts(__MODULE__.t()) :: keyword()
345
  def opts(%__MODULE__{} = settings) do
346
    settings
347
    |> Map.from_struct()
348
    |> Map.to_list()
349
    |> Keyword.new()
6✔
350
  end
351

352
  defp tenant_pool_requirements(settings) do
353
    application_names = [
232✔
354
      "realtime_subscription_manager",
355
      "realtime_subscription_manager_pub",
356
      "realtime_subscription_checker",
357
      "realtime_health_check",
358
      "realtime_janitor",
359
      "realtime_migrations",
360
      "realtime_broadcast_changes",
361
      "realtime_rls",
362
      "realtime_replication_slot_teardown",
363
      "realtime_connect"
364
    ]
365

366
    Enum.reduce(application_names, 0, fn application_name, acc ->
232✔
367
      acc + pool_size_by_application_name(application_name, settings)
2,320✔
368
    end)
369
  end
370
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc