• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 16470769502

23 Jul 2025 12:34PM UTC coverage: 57.067% (+1.7%) from 55.355%
16470769502

Pull #694

github

web-flow
Merge 78a9c0b2c into deaa48192
Pull Request #694: feat: improved named prepared statements support

175 of 217 new or added lines in 11 files covered. (80.65%)

16 existing lines in 4 files now uncovered.

1292 of 2264 relevant lines covered (57.07%)

1126.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.25
/lib/supavisor.ex
1
defmodule Supavisor do
2
  @moduledoc false
3

4
  require Logger
5

6
  alias Supavisor.{
7
    Helpers,
8
    Manager,
9
    Tenants
10
  }
11

12
  @type sock :: tcp_sock() | ssl_sock()
13
  @type ssl_sock :: {:ssl, :ssl.sslsocket()}
14
  @type tcp_sock :: {:gen_tcp, :gen_tcp.socket()}
15
  @type workers :: %{manager: pid, pool: pid}
16
  @type secrets :: {:password | :auth_query, fun()}
17
  @type mode :: :transaction | :session | :native | :proxy
18
  @type id :: {{:single | :cluster, String.t()}, String.t(), mode, String.t(), String.t() | nil}
19
  @type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer}
20

21
  @registry Supavisor.Registry.Tenants
22
  @max_pools Application.compile_env(:supavisor, :max_pools, 20)
23

24
  @spec start_dist(id, secrets, keyword()) :: {:ok, pid()} | {:error, any()}
25
  def start_dist(id, secrets, options \\ []) do
26
    options =
274✔
27
      Keyword.validate!(options, log_level: nil, force_node: false, availability_zone: nil)
28

29
    log_level = Keyword.fetch!(options, :log_level)
274✔
30
    force_node = Keyword.fetch!(options, :force_node)
274✔
31
    availability_zone = Keyword.fetch!(options, :availability_zone)
274✔
32

33
    case get_global_sup(id) do
274✔
34
      nil ->
35
        node = if force_node, do: force_node, else: determine_node(id, availability_zone)
31✔
36

37
        if node == node() do
31✔
38
          Logger.debug("Starting local pool for #{inspect(id)}")
31✔
39
          try_start_local_pool(id, secrets, log_level)
31✔
40
        else
41
          Logger.debug("Starting remote pool for #{inspect(id)}")
×
42
          Helpers.rpc(node, __MODULE__, :try_start_local_pool, [id, secrets, log_level])
×
43
        end
44

45
      pid ->
243✔
46
        {:ok, pid}
47
    end
48
  end
49

50
  @spec start(id, secrets) :: {:ok, pid} | {:error, any}
51
  def start(id, secrets) do
52
    case get_global_sup(id) do
1✔
53
      nil ->
54
        try_start_local_pool(id, secrets, nil)
1✔
55

56
      pid ->
×
57
        {:ok, pid}
58
    end
59
  end
60

61
  @spec stop(id) :: :ok | {:error, :tenant_not_found}
62
  def stop(id) do
63
    case get_global_sup(id) do
32✔
64
      nil -> {:error, :tenant_not_found}
16✔
65
      pid -> Supervisor.stop(pid)
16✔
66
    end
67
  end
68

69
  @spec get_local_workers(id) :: {:ok, workers} | {:error, :worker_not_found}
70
  def get_local_workers(id) do
71
    workers = %{
274✔
72
      manager: get_local_manager(id),
73
      pool: get_local_pool(id)
74
    }
75

76
    if nil in Map.values(workers) do
274✔
77
      Logger.error("Could not get workers for tenant #{inspect(id)}")
×
78
      {:error, :worker_not_found}
79
    else
80
      {:ok, workers}
81
    end
82
  end
83

84
  @spec subscribe_local(pid, id) :: {:ok, subscribe_opts} | {:error, any()}
85
  def subscribe_local(pid, id) do
86
    with {:ok, workers} <- get_local_workers(id),
274✔
87
         {:ok, ps, idle_timeout} <- Manager.subscribe(workers.manager, pid) do
274✔
88
      {:ok, %{workers: workers, ps: ps, idle_timeout: idle_timeout}}
89
    end
90
  end
91

92
  @spec subscribe(pid, id, pid) :: {:ok, subscribe_opts} | {:error, any()}
93
  def subscribe(sup, id, pid \\ self()) do
94
    dest_node = node(sup)
274✔
95

96
    if node() == dest_node do
274✔
97
      subscribe_local(pid, id)
274✔
98
    else
99
      Helpers.rpc(dest_node, __MODULE__, :subscribe_local, [pid, id], 15_000)
×
100
    end
101
  end
102

103
  @spec get_global_sup(id) :: pid | nil
104
  def get_global_sup(id) do
105
    case :syn.whereis_name({:tenants, id}) do
341✔
106
      :undefined -> nil
79✔
107
      pid -> pid
262✔
108
    end
109
  end
110

111
  @doc """
112
  During netsplits, or due to certain internal conflicts, :syn may store inconsistent data across the cluster.
113
  This function terminates all connection trees related to a specific tenant.
114
  """
115
  @spec dirty_terminate(String.t(), pos_integer()) :: map()
116
  def dirty_terminate(tenant, timeout \\ 15_000) do
117
    Registry.lookup(Supavisor.Registry.TenantSups, tenant)
118
    |> Enum.reduce(%{}, fn {pid, %{user: user, mode: _mode}}, acc ->
2✔
119
      stop =
×
120
        try do
121
          Supervisor.stop(pid, :shutdown, timeout)
×
122
        catch
123
          error, reason -> {:error, {error, reason}}
×
124
        end
125

126
      resp = %{
×
127
        stop: stop,
128
        cache: del_all_cache(tenant, user)
129
      }
130

131
      Map.put(acc, user, resp)
×
132
    end)
133
  end
134

135
  def terminate_global(tenant) do
136
    :erpc.multicall([node() | Node.list()], Supavisor, :dirty_terminate, [tenant], 60_000)
2✔
137
  end
138

139
  @spec del_all_cache(String.t(), String.t()) :: [map()]
140
  def del_all_cache(tenant, user) do
141
    Logger.info("Deleting all cache for tenant #{tenant} and user #{user}")
×
142
    {:ok, keys} = Cachex.keys(Supavisor.Cache)
×
143

144
    del = fn key, acc ->
×
145
      result = Cachex.del(Supavisor.Cache, key)
×
146
      [%{inspect(key) => inspect(result)} | acc]
147
    end
148

149
    Enum.reduce(keys, [], fn
×
150
      {:metrics, ^tenant} = key, acc -> del.(key, acc)
×
151
      {:secrets, ^tenant, ^user} = key, acc -> del.(key, acc)
×
152
      {:user_cache, _, ^user, ^tenant, _} = key, acc -> del.(key, acc)
×
153
      {:tenant_cache, ^tenant, _} = key, acc -> del.(key, acc)
×
154
      {:pool_config_cache, ^tenant, ^user} = key, acc -> del.(key, acc)
×
155
      _, acc -> acc
×
156
    end)
157
  end
158

159
  @spec del_all_cache(String.t()) :: [map()]
160
  def del_all_cache(tenant) do
161
    Logger.info("Deleting all cache for tenant #{tenant}")
6✔
162

163
    del = fn key, acc ->
6✔
164
      result = Cachex.del(Supavisor.Cache, key)
4✔
165
      [%{inspect(key) => inspect(result)} | acc]
166
    end
167

168
    :ets.foldl(
6✔
169
      fn
170
        {:entry, key, _, _, _result}, acc ->
171
          case key do
4✔
172
            {:metrics, ^tenant} -> del.(key, acc)
×
173
            {:secrets, ^tenant, _} -> del.(key, acc)
×
174
            {:user_cache, _, _, ^tenant, _} -> del.(key, acc)
2✔
175
            {:tenant_cache, ^tenant, _} -> del.(key, acc)
2✔
176
            {:pool_config_cache, ^tenant, _} -> del.(key, acc)
×
UNCOV
177
            _ -> acc
×
178
          end
179

180
        other, acc ->
181
          Logger.error("Unknown key: #{inspect(other)}")
×
182
          acc
×
183
      end,
184
      [],
185
      Supavisor.Cache
186
    )
187
  end
188

189
  @spec del_all_cache_dist(String.t(), pos_integer()) :: [map()]
190
  def del_all_cache_dist(tenant, timeout \\ 15_000) do
191
    Logger.info("Deleting all dist cache for tenant #{tenant}")
6✔
192

193
    for node <- [node() | Node.list()] do
6✔
194
      %{to_string(node) => :erpc.call(node, Supavisor, :del_all_cache, [tenant], timeout)}
6✔
195
    end
196
  end
197

198
  @spec get_local_pool(id) :: map | pid | nil
199
  def get_local_pool(id) do
200
    match = {{:pool, :_, :_, id}, :"$2", :"$3"}
274✔
201
    body = [{{:"$2", :"$3"}}]
274✔
202

203
    case Registry.select(@registry, [{match, [], body}]) do
274✔
204
      [{pool, _}] ->
205
        pool
274✔
206

207
      [_ | _] = pools ->
208
        # transform [{pid1, :read}, {pid2, :read}, {pid3, :write}]
209
        # to %{read: [pid1, pid2], write: [pid3]}
210
        Enum.group_by(pools, &elem(&1, 1), &elem(&1, 0))
×
211

212
      _ ->
×
213
        nil
214
    end
215
  end
216

217
  @spec get_local_manager(id) :: pid | nil
218
  def get_local_manager(id) do
219
    case Registry.lookup(@registry, {:manager, id}) do
411✔
220
      [{pid, _}] -> pid
411✔
221
      _ -> nil
×
222
    end
223
  end
224

225
  @spec id({:single | :cluster, String.t()}, String.t(), mode, mode, String.t(), String.t() | nil) ::
226
          id
227
  def id(tenant, user, port_mode, user_mode, db_name, search_path) do
228
    # temporary hack
229
    mode =
280✔
230
      if port_mode == :transaction do
231
        user_mode
183✔
232
      else
233
        port_mode
97✔
234
      end
235

236
    {tenant, user, mode, db_name, search_path}
280✔
237
  end
238

239
  @spec tenant(id) :: String.t()
240
  def tenant({{_, tenant}, _, _, _, _}), do: tenant
87✔
241

242
  @spec mode(id) :: atom()
243
  def mode({_, _, mode, _, _}), do: mode
284✔
244

245
  @spec search_path(id) :: String.t() | nil
246
  def search_path({_, _, _, _, search_path}), do: search_path
139✔
247

248
  @spec determine_node(id, String.t() | nil) :: Node.t()
249
  def determine_node(id, availability_zone) do
250
    tenant_id = tenant(id)
31✔
251

252
    # If the AWS zone group is empty, we will use all nodes.
253
    # If the AWS zone group exists with the same zone, we will use nodes from this group.
254
    #   :syn.members(:availability_zone, "1c")
255
    #   [{#PID<0.381.0>, [node: :"node1@127.0.0.1"]}]
256
    nodes =
31✔
257
      with zone when is_binary(zone) <- availability_zone,
31✔
258
           zone_nodes when zone_nodes != [] <- :syn.members(:availability_zone, zone) do
×
259
        zone_nodes
260
        |> Enum.map(fn {_, [node: node]} -> node end)
×
261
      else
262
        _ -> [node() | Node.list()]
263
      end
264

265
    index = :erlang.phash2(tenant_id, length(nodes))
31✔
266

267
    nodes
268
    |> Enum.sort()
269
    |> Enum.at(index)
31✔
270
  end
271

272
  @spec try_start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
273
  def try_start_local_pool(id, secrets, log_level) do
274
    if count_pools(tenant(id)) < @max_pools,
32✔
275
      do: start_local_pool(id, secrets, log_level),
32✔
276
      else: {:error, :max_pools_reached}
277
  end
278

279
  @spec start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
280
  def start_local_pool(
281
        {{type, tenant}, _user, _mode, _db_name, _search_path} = id,
282
        secrets,
283
        log_level \\ nil
×
284
      ) do
285
    Logger.info("Starting pool(s) for #{inspect(id)}")
32✔
286

287
    user = elem(secrets, 1).().alias
32✔
288

289
    case type do
290
      :single -> Tenants.get_pool_config_cache(tenant, user)
32✔
291
      :cluster -> Tenants.get_cluster_config(tenant, user)
×
292
    end
293
    |> case do
32✔
294
      [_ | _] = replicas ->
295
        opts =
32✔
296
          Enum.map(replicas, fn replica ->
297
            case replica do
298
              %Tenants.ClusterTenants{tenant: tenant, type: type} ->
299
                Map.put(tenant, :replica_type, type)
×
300

301
              %Tenants.Tenant{} = tenant ->
302
                Map.put(tenant, :replica_type, :write)
32✔
303
            end
304
            |> supervisor_args(id, secrets, log_level)
32✔
305
          end)
306

307
        DynamicSupervisor.start_child(
308
          {:via, PartitionSupervisor, {Supavisor.DynamicSupervisor, id}},
309
          {Supavisor.TenantSupervisor, %{id: id, replicas: opts, log_level: log_level}}
310
        )
311
        |> case do
32✔
312
          {:error, {:already_started, pid}} -> {:ok, pid}
8✔
313
          resp -> resp
24✔
314
        end
315

316
      error ->
317
        Logger.error("Can't find tenant with external_id #{inspect(id)} #{inspect(error)}")
×
318

319
        {:error, :tenant_not_found}
320
    end
321
  end
322

323
  ## Internal functions
324

325
  defp supervisor_args(
326
         tenant_record,
327
         {tenant, user, mode, db_name, _search_path} = id,
328
         {method, secrets},
329
         log_level
330
       ) do
331
    %{
332
      db_host: db_host,
333
      db_port: db_port,
334
      db_database: db_database,
335
      auth_query: auth_query,
336
      default_parameter_status: ps,
337
      ip_version: ip_ver,
338
      default_pool_size: def_pool_size,
339
      default_max_clients: def_max_clients,
340
      client_idle_timeout: client_idle_timeout,
341
      replica_type: replica_type,
342
      sni_hostname: sni_hostname,
343
      users: [
344
        %{
345
          db_user: db_user,
346
          db_password: db_pass,
347
          pool_size: pool_size,
348
          db_user_alias: alias,
349
          # mode_type: mode_type,
350
          max_clients: max_clients
351
        }
352
      ]
353
    } = tenant_record
32✔
354

355
    {pool_size, max_clients} =
32✔
356
      if method == :auth_query do
32✔
357
        {def_pool_size, def_max_clients}
358
      else
359
        {pool_size, max_clients}
360
      end
361

362
    auth = %{
32✔
363
      host: String.to_charlist(db_host),
364
      sni_hostname: if(sni_hostname != nil, do: to_charlist(sni_hostname)),
32✔
365
      port: db_port,
366
      user: db_user,
367
      alias: alias,
368
      auth_query: auth_query,
369
      database: if(db_name != nil, do: db_name, else: db_database),
32✔
370
      password: fn -> db_pass end,
24✔
371
      application_name: "Supavisor",
372
      ip_version: Helpers.ip_version(ip_ver, db_host),
373
      upstream_ssl: tenant_record.upstream_ssl,
32✔
374
      upstream_verify: tenant_record.upstream_verify,
32✔
375
      upstream_tls_ca: Helpers.upstream_cert(tenant_record.upstream_tls_ca),
32✔
376
      require_user: tenant_record.require_user,
32✔
377
      method: method,
378
      secrets: secrets
379
    }
380

381
    %{
32✔
382
      id: id,
383
      tenant: tenant,
384
      replica_type: replica_type,
385
      user: user,
386
      auth: auth,
387
      pool_size: pool_size,
388
      mode: mode,
389
      default_parameter_status: ps,
390
      max_clients: max_clients,
391
      client_idle_timeout: client_idle_timeout,
392
      log_level: log_level
393
    }
394
  end
395

396
  @spec set_parameter_status(id, [{binary, binary}]) ::
397
          :ok | {:error, :not_found}
398
  def set_parameter_status(id, ps) do
399
    case get_local_manager(id) do
136✔
400
      nil -> {:error, :not_found}
×
401
      pid -> Manager.set_parameter_status(pid, ps)
136✔
402
    end
403
  end
404

405
  @spec get_pool_ranch(id) :: {:ok, map()} | {:error, :not_found}
406
  def get_pool_ranch(id) do
407
    case :syn.lookup(:tenants, id) do
×
408
      {_sup_pid, %{port: _port, host: _host} = meta} -> {:ok, meta}
×
409
      _ -> {:error, :not_found}
×
410
    end
411
  end
412

413
  @spec get_local_server(id, atom) :: map()
414
  def get_local_server(id, mode) do
415
    host = Application.get_env(:supavisor, :node_host)
32✔
416
    local_proxy_shards = Application.fetch_env!(:supavisor, :local_proxy_shards)
32✔
417
    shard = :erlang.phash2(id, local_proxy_shards)
32✔
418
    %{host: host, port: :ranch.get_port({:pg_proxy_internal, mode, shard})}
32✔
419
  end
420

421
  @spec count_pools(String.t()) :: non_neg_integer()
422
  def count_pools(tenant),
423
    do: Registry.count_match(Supavisor.Registry.TenantSups, tenant, :_)
32✔
424
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc