• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.23
/lib/supavisor.ex
1
defmodule Supavisor do
2
  @moduledoc false
3

4
  require Logger
5

6
  alias Supavisor.{
7
    Helpers,
8
    Manager,
9
    Tenants
10
  }
11

12
  @type sock :: tcp_sock() | ssl_sock()
13
  @type ssl_sock :: {:ssl, :ssl.sslsocket()}
14
  @type tcp_sock :: {:gen_tcp, :gen_tcp.socket()}
15
  @type workers :: %{manager: pid, pool: pid}
16
  @type secrets :: {:password | :auth_query, fun()}
17
  @type mode :: :transaction | :session | :native | :proxy
18
  @type id :: {{:single | :cluster, String.t()}, String.t(), mode, String.t(), String.t() | nil}
19
  @type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer}
20

21
  @registry Supavisor.Registry.Tenants
22
  @max_pools Application.compile_env(:supavisor, :max_pools, 20)
23

24
  @spec start_dist(id, secrets, keyword()) :: {:ok, pid()} | {:error, any()}
25
  def start_dist(id, secrets, options \\ []) do
26
    options =
605✔
27
      Keyword.validate!(options, log_level: nil, force_node: false, availability_zone: nil)
28

29
    log_level = Keyword.fetch!(options, :log_level)
605✔
30
    force_node = Keyword.fetch!(options, :force_node)
605✔
31
    availability_zone = Keyword.fetch!(options, :availability_zone)
605✔
32

33
    case get_global_sup(id) do
605✔
34
      nil ->
35
        node = if force_node, do: force_node, else: determine_node(id, availability_zone)
36✔
36

37
        if node == node() do
36✔
38
          Logger.debug("Starting local pool for #{inspect(id)}")
36✔
39
          try_start_local_pool(id, secrets, log_level)
36✔
40
        else
41
          Logger.debug("Starting remote pool for #{inspect(id)}")
×
42
          Helpers.rpc(node, __MODULE__, :try_start_local_pool, [id, secrets, log_level])
×
43
        end
44

45
      pid ->
569✔
46
        {:ok, pid}
47
    end
48
  end
49

50
  @spec start(id, secrets) :: {:ok, pid} | {:error, any}
51
  def start(id, secrets) do
52
    case get_global_sup(id) do
1✔
53
      nil ->
54
        try_start_local_pool(id, secrets, nil)
1✔
55

56
      pid ->
×
57
        {:ok, pid}
58
    end
59
  end
60

61
  @spec stop(id) :: :ok | {:error, :tenant_not_found}
62
  def stop(id) do
63
    case get_global_sup(id) do
42✔
64
      nil -> {:error, :tenant_not_found}
20✔
65
      pid -> Supervisor.stop(pid)
22✔
66
    end
67
  end
68

69
  @spec get_local_workers(id) :: {:ok, workers} | {:error, :worker_not_found}
70
  def get_local_workers(id) do
71
    workers = %{
605✔
72
      manager: get_local_manager(id),
73
      pool: get_local_pool(id)
74
    }
75

76
    if nil in Map.values(workers) do
605✔
77
      Logger.error("Could not get workers for tenant #{inspect(id)}")
3✔
78
      {:error, :worker_not_found}
79
    else
80
      {:ok, workers}
81
    end
82
  end
83

84
  @spec subscribe_local(pid, id) ::
85
          {:ok, subscribe_opts}
86
          | {:error, :max_clients_reached}
87
          | {:error, :terminating, term()}
88
  def subscribe_local(pid, id) do
89
    with {:ok, workers} <- get_local_workers(id),
605✔
90
         {:ok, ps, idle_timeout} <- Manager.subscribe(workers.manager, pid) do
602✔
91
      {:ok, %{workers: workers, ps: ps, idle_timeout: idle_timeout}}
92
    end
93
  end
94

95
  @spec subscribe(pid, id, pid) ::
96
          {:ok, subscribe_opts}
97
          | {:error, :max_clients_reached}
98
          | {:error, :terminating, term()}
99
  def subscribe(sup, id, pid \\ self()) do
100
    dest_node = node(sup)
605✔
101

102
    if node() == dest_node do
605✔
103
      subscribe_local(pid, id)
605✔
104
    else
105
      Helpers.rpc(dest_node, __MODULE__, :subscribe_local, [pid, id], 15_000)
×
106
    end
107
  end
108

109
  @spec get_global_sup(id) :: pid | nil
110
  def get_global_sup(id) do
111
    case :syn.whereis_name({:tenants, id}) do
740✔
112
      :undefined -> nil
143✔
113
      pid -> pid
597✔
114
    end
115
  end
116

117
  @doc """
118
  During netsplits, or due to certain internal conflicts, :syn may store inconsistent data across the cluster.
119
  This function terminates all connection trees related to a specific tenant.
120
  """
121
  @spec dirty_terminate(String.t(), pos_integer()) :: map()
122
  def dirty_terminate(tenant, timeout \\ 15_000) do
123
    Registry.lookup(Supavisor.Registry.TenantSups, tenant)
124
    |> Enum.reduce(%{}, fn {pid, %{user: user, mode: _mode}}, acc ->
3✔
125
      stop =
×
126
        try do
127
          Supervisor.stop(pid, :shutdown, timeout)
×
128
        catch
129
          error, reason -> {:error, {error, reason}}
×
130
        end
131

132
      resp = %{
×
133
        stop: stop,
134
        cache: del_all_cache(tenant, user)
135
      }
136

137
      Map.put(acc, user, resp)
×
138
    end)
139
  end
140

141
  def terminate_global(tenant) do
142
    :erpc.multicall([node() | Node.list()], Supavisor, :dirty_terminate, [tenant], 60_000)
3✔
143
  end
144

145
  @doc """
146
  Updates credentials for all SecretChecker processes for a tenant across the cluster.
147
  Used for auth_query mode (require_user: false) to hot-update credentials without restarting pools.
148
  """
149
  @spec update_secret_checker_credentials_global(String.t(), String.t(), (-> String.t())) :: [
150
          {node(), term()}
151
        ]
152
  def update_secret_checker_credentials_global(tenant, new_user, password_fn) do
153
    :erpc.multicall(
3✔
154
      [node() | Node.list()],
155
      Supavisor,
156
      :update_secret_checker_credentials_local,
157
      [tenant, new_user, password_fn],
158
      60_000
159
    )
160
  end
161

162
  @spec update_secret_checker_credentials_local(String.t(), String.t(), (-> String.t())) :: map()
163
  def update_secret_checker_credentials_local(tenant, new_user, password_fn) do
164
    Registry.lookup(Supavisor.Registry.TenantSups, tenant)
165
    |> Enum.reduce(%{}, fn {_pid,
3✔
166
                            %{
167
                              user: user,
168
                              mode: mode,
169
                              type: type,
170
                              db_name: db_name,
171
                              search_path: search_path
172
                            }},
173
                           acc ->
174
      id = {{type, tenant}, user, mode, db_name, search_path}
2✔
175
      result = Supavisor.SecretChecker.update_credentials(id, new_user, password_fn)
2✔
176
      Map.put(acc, {user, mode}, result)
2✔
177
    end)
178
  end
179

180
  @spec del_all_cache(String.t(), String.t()) :: [map()]
181
  def del_all_cache(tenant, user) do
182
    Logger.info("Deleting all cache for tenant #{tenant} and user #{user}")
×
183
    {:ok, keys} = Cachex.keys(Supavisor.Cache)
×
184

185
    del = fn key, acc ->
×
186
      result = Cachex.del(Supavisor.Cache, key)
×
187
      [%{inspect(key) => inspect(result)} | acc]
188
    end
189

190
    Enum.reduce(keys, [], fn
×
191
      {:metrics, ^tenant} = key, acc -> del.(key, acc)
×
NEW
192
      {:secrets_for_validation, ^tenant, ^user} = key, acc -> del.(key, acc)
×
NEW
193
      {:secrets_for_upstream_auth, ^tenant, ^user} = key, acc -> del.(key, acc)
×
NEW
194
      {:secrets_check, ^tenant, ^user} = key, acc -> del.(key, acc)
×
195
      {:user_cache, _, ^user, ^tenant, _} = key, acc -> del.(key, acc)
×
196
      {:tenant_cache, ^tenant, _} = key, acc -> del.(key, acc)
×
197
      {:pool_config_cache, ^tenant, ^user} = key, acc -> del.(key, acc)
×
198
      _, acc -> acc
×
199
    end)
200
  end
201

202
  @spec del_all_cache(String.t()) :: [map()]
203
  def del_all_cache(tenant) do
204
    Logger.info("Deleting all cache for tenant #{tenant}")
14✔
205

206
    del = fn key, acc ->
14✔
207
      result = Cachex.del(Supavisor.Cache, key)
15✔
208
      [%{inspect(key) => inspect(result)} | acc]
209
    end
210

211
    :ets.foldl(
14✔
212
      fn
213
        {:entry, key, _, _, _result}, acc ->
214
          case key do
639✔
215
            {:metrics, ^tenant} -> del.(key, acc)
×
216
            {:secrets_for_validation, ^tenant, _} -> del.(key, acc)
2✔
217
            {:secrets_for_upstream_auth, ^tenant, _} -> del.(key, acc)
2✔
NEW
218
            {:secrets_check, ^tenant, _} -> del.(key, acc)
×
219
            {:user_cache, _, _, ^tenant, _} -> del.(key, acc)
6✔
220
            {:tenant_cache, ^tenant, _} -> del.(key, acc)
3✔
221
            {:pool_config_cache, ^tenant, _} -> del.(key, acc)
2✔
222
            _ -> acc
624✔
223
          end
224

225
        other, acc ->
226
          Logger.error("Unknown key: #{inspect(other)}")
×
227
          acc
×
228
      end,
229
      [],
230
      Supavisor.Cache
231
    )
232
  end
233

234
  @spec del_all_cache_dist(String.t(), pos_integer()) :: [map()]
235
  def del_all_cache_dist(tenant, timeout \\ 15_000) do
236
    Logger.info("Deleting all dist cache for tenant #{tenant}")
14✔
237

238
    for node <- [node() | Node.list()] do
14✔
239
      %{to_string(node) => :erpc.call(node, Supavisor, :del_all_cache, [tenant], timeout)}
14✔
240
    end
241
  end
242

243
  @spec get_local_pool(id) :: map | pid | nil
244
  def get_local_pool(id) do
245
    match = {{:pool, :_, :_, id}, :"$2", :"$3"}
605✔
246
    body = [{{:"$2", :"$3"}}]
605✔
247

248
    case Registry.select(@registry, [{match, [], body}]) do
605✔
249
      [{pool, _}] ->
250
        pool
602✔
251

252
      [_ | _] = pools ->
253
        # transform [{pid1, :read}, {pid2, :read}, {pid3, :write}]
254
        # to %{read: [pid1, pid2], write: [pid3]}
255
        Enum.group_by(pools, &elem(&1, 1), &elem(&1, 0))
×
256

257
      _ ->
3✔
258
        nil
259
    end
260
  end
261

262
  @spec get_local_manager(id) :: pid | nil
263
  def get_local_manager(id) do
264
    case Registry.lookup(@registry, {:manager, id}) do
1,206✔
265
      [{pid, _}] -> pid
1,206✔
266
      _ -> nil
×
267
    end
268
  end
269

270
  @spec id({:single | :cluster, String.t()}, String.t(), mode, mode, String.t(), String.t() | nil) ::
271
          id
272
  def id(tenant, user, port_mode, user_mode, db_name, search_path) do
273
    # temporary hack
274
    mode =
615✔
275
      if port_mode == :transaction do
276
        user_mode
343✔
277
      else
278
        port_mode
272✔
279
      end
280

281
    {tenant, user, mode, db_name, search_path}
615✔
282
  end
283

284
  @spec tenant(id) :: String.t()
285
  def tenant({{_, tenant}, _, _, _, _}), do: tenant
74✔
286

287
  @spec mode(id) :: atom()
288
  def mode({_, _, mode, _, _}), do: mode
4✔
289

290
  @spec search_path(id) :: String.t() | nil
291
  def search_path({_, _, _, _, search_path}), do: search_path
305✔
292

293
  @spec determine_node(id, String.t() | nil) :: Node.t()
294
  def determine_node(id, availability_zone) do
295
    tenant_id = tenant(id)
36✔
296

297
    # If the AWS zone group is empty, we will use all nodes.
298
    # If the AWS zone group exists with the same zone, we will use nodes from this group.
299
    #   :syn.members(:availability_zone, "1c")
300
    #   [{#PID<0.381.0>, [node: :"node1@127.0.0.1"]}]
301
    nodes =
36✔
302
      with zone when is_binary(zone) <- availability_zone,
36✔
303
           zone_nodes when zone_nodes != [] <- :syn.members(:availability_zone, zone) do
×
304
        zone_nodes
305
        |> Enum.map(fn {_, [node: node]} -> node end)
×
306
      else
307
        _ -> [node() | Node.list()]
308
      end
309

310
    index = :erlang.phash2(tenant_id, length(nodes))
36✔
311

312
    nodes
313
    |> Enum.sort()
314
    |> Enum.at(index)
36✔
315
  end
316

317
  @spec try_start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
318
  def try_start_local_pool(id, secrets, log_level) do
319
    if count_pools(tenant(id)) < @max_pools,
37✔
320
      do: start_local_pool(id, secrets, log_level),
37✔
321
      else: {:error, :max_pools_reached}
322
  end
323

324
  @spec start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
325
  def start_local_pool(
326
        {{type, tenant}, _user, _mode, _db_name, _search_path} = id,
327
        secrets,
328
        log_level \\ nil
×
329
      ) do
330
    Logger.info("Starting pool(s) for #{inspect(id)}")
37✔
331

332
    secrets_map = elem(secrets, 1).()
37✔
333
    user = secrets_map.user
37✔
334

335
    case type do
336
      :single -> Tenants.get_pool_config_cache(tenant, user)
37✔
337
      :cluster -> Tenants.get_cluster_config(tenant, user)
×
338
    end
339
    |> case do
37✔
340
      [_ | _] = replicas ->
341
        # Extract only minimal info needed for pool creation
342
        replicas_info =
37✔
343
          Enum.map(replicas, fn replica ->
344
            case replica do
37✔
345
              %Tenants.ClusterTenants{tenant: tenant, type: type} ->
NEW
346
                first_user = List.first(tenant.users)
×
347

NEW
348
                %{
×
349
                  replica_type: type,
350
                  pool_size:
NEW
351
                    if(first_user, do: first_user.pool_size, else: tenant.default_pool_size)
×
352
                }
353

354
              %Tenants.Tenant{} = tenant ->
355
                first_user = List.first(tenant.users)
37✔
356

357
                %{
37✔
358
                  replica_type: :write,
359
                  pool_size:
360
                    if(first_user, do: first_user.pool_size, else: tenant.default_pool_size)
37✔
361
                }
362
            end
363
          end)
364

365
        DynamicSupervisor.start_child(
366
          {:via, PartitionSupervisor, {Supavisor.DynamicSupervisor, id}},
367
          {Supavisor.TenantSupervisor,
368
           %{id: id, replicas: replicas_info, secrets: secrets, log_level: log_level}}
369
        )
370
        |> case do
37✔
371
          {:error, {:already_started, pid}} -> {:ok, pid}
3✔
372
          resp -> resp
34✔
373
        end
374

375
      error ->
376
        Logger.error("Can't find tenant with external_id #{inspect(id)} #{inspect(error)}")
×
377

378
        {:error, :tenant_not_found}
379
    end
380
  end
381

382
  ## Internal functions
383

384
  @spec set_parameter_status(id, [{binary, binary}]) ::
385
          :ok | {:error, :not_found}
386
  def set_parameter_status(id, ps) do
387
    case get_local_manager(id) do
293✔
388
      nil -> {:error, :not_found}
×
389
      pid -> Manager.set_parameter_status(pid, ps)
293✔
390
    end
391
  end
392

393
  @spec get_pool_ranch(id) :: {:ok, map()} | {:error, :not_found}
394
  def get_pool_ranch(id) do
395
    case :syn.lookup(:tenants, id) do
×
396
      {_sup_pid, %{port: _port, host: _host} = meta} -> {:ok, meta}
×
397
      _ -> {:error, :not_found}
×
398
    end
399
  end
400

401
  @spec get_local_server(id) :: map()
402
  def get_local_server({_, _, mode, _, _} = id) do
403
    host = Application.get_env(:supavisor, :node_host)
37✔
404

405
    ports =
37✔
406
      case mode do
407
        :session -> Application.fetch_env!(:supavisor, :session_proxy_ports)
6✔
408
        :transaction -> Application.fetch_env!(:supavisor, :transaction_proxy_ports)
31✔
409
      end
410

411
    shard = :erlang.phash2(id, length(ports))
37✔
412
    %{host: host, port: :ranch.get_port({:pg_proxy_internal, mode, shard})}
37✔
413
  end
414

415
  @spec count_pools(String.t()) :: non_neg_integer()
416
  def count_pools(tenant),
417
    do: Registry.count_match(Supavisor.Registry.TenantSups, tenant, :_)
37✔
418
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc