• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 21579574864

02 Feb 2026 06:20AM UTC coverage: 65.58% (+0.08%) from 65.5%
21579574864

push

github

web-flow
fix: invalidate cache on tenant creation (#842)

Previously, if a client tried to connect before a tenant was created,
the `get_user_cache` function would cache the error. After the tenant
was created, connection attempts would still fail until the cache TTL.
Now the cache is cleared after inserts.

Additionally, cache and pool termination logic has been moved from the
controller to the domain module.

16 of 16 new or added lines in 1 file covered. (100.0%)

2 existing lines in 2 files now uncovered.

1932 of 2946 relevant lines covered (65.58%)

4216.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.75
/lib/supavisor.ex
1
defmodule Supavisor do
2
  @moduledoc false
3

4
  require Logger
5

6
  alias Supavisor.{
7
    Helpers,
8
    Manager,
9
    Tenants
10
  }
11

12
  @type sock :: tcp_sock() | ssl_sock()
13
  @type ssl_sock :: {:ssl, :ssl.sslsocket()}
14
  @type tcp_sock :: {:gen_tcp, :gen_tcp.socket()}
15
  @type workers :: %{manager: pid, pool: pid}
16
  @type secrets :: {:password | :auth_query, fun()}
17
  @type mode :: :transaction | :session | :native | :proxy
18
  @type id :: {{:single | :cluster, String.t()}, String.t(), mode, String.t(), String.t() | nil}
19
  @type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer}
20

21
  @registry Supavisor.Registry.Tenants
22
  @max_pools Application.compile_env(:supavisor, :max_pools, 20)
23

24
  @spec start_dist(id, secrets, keyword()) :: {:ok, pid()} | {:error, any()}
25
  def start_dist(id, secrets, options \\ []) do
26
    options =
598✔
27
      Keyword.validate!(options, log_level: nil, force_node: false, availability_zone: nil)
28

29
    log_level = Keyword.fetch!(options, :log_level)
598✔
30
    force_node = Keyword.fetch!(options, :force_node)
598✔
31
    availability_zone = Keyword.fetch!(options, :availability_zone)
598✔
32

33
    case get_global_sup(id) do
598✔
34
      nil ->
35
        node = if force_node, do: force_node, else: determine_node(id, availability_zone)
59✔
36

37
        if node == node() do
59✔
38
          Logger.debug("Starting local pool for #{inspect(id)}")
59✔
39
          try_start_local_pool(id, secrets, log_level)
59✔
40
        else
41
          Logger.debug("Starting remote pool for #{inspect(id)}")
×
42
          Helpers.rpc(node, __MODULE__, :try_start_local_pool, [id, secrets, log_level])
×
43
        end
44

45
      pid ->
539✔
46
        {:ok, pid}
47
    end
48
  end
49

50
  @spec start(id, secrets) :: {:ok, pid} | {:error, any}
51
  def start(id, secrets) do
52
    case get_global_sup(id) do
1✔
53
      nil ->
54
        try_start_local_pool(id, secrets, nil)
1✔
55

56
      pid ->
×
57
        {:ok, pid}
58
    end
59
  end
60

61
  @spec stop(id) :: :ok | {:error, :tenant_not_found}
62
  def stop(id) do
63
    case get_global_sup(id) do
65✔
64
      nil ->
28✔
65
        {:error, :tenant_not_found}
66

67
      pid ->
68
        Supervisor.stop(pid)
37✔
69
    end
70
  end
71

72
  @spec get_local_workers(id) :: {:ok, workers} | {:error, :worker_not_found}
73
  def get_local_workers(id) do
74
    workers = %{
598✔
75
      manager: get_local_manager(id),
76
      pool: get_local_pool(id)
77
    }
78

79
    if nil in Map.values(workers) do
598✔
80
      Logger.error("Could not get workers for tenant #{inspect(id)}")
1✔
81
      {:error, :worker_not_found}
82
    else
83
      {:ok, workers}
84
    end
85
  end
86

87
  @spec subscribe_local(pid, id) ::
88
          {:ok, subscribe_opts}
89
          | {:error, :max_clients_reached}
90
          | {:error, :terminating, term()}
91
  def subscribe_local(pid, id) do
92
    with {:ok, workers} <- get_local_workers(id),
598✔
93
         {:ok, ps, idle_timeout} <- Manager.subscribe(workers.manager, pid) do
597✔
94
      {:ok, %{workers: workers, ps: ps, idle_timeout: idle_timeout}}
95
    end
96
  end
97

98
  @spec subscribe(pid, id, pid) ::
99
          {:ok, subscribe_opts}
100
          | {:error, :max_clients_reached}
101
          | {:error, :terminating, term()}
102
  def subscribe(sup, id, pid \\ self()) do
103
    dest_node = node(sup)
598✔
104

105
    if node() == dest_node do
598✔
106
      subscribe_local(pid, id)
598✔
107
    else
108
      Helpers.rpc(dest_node, __MODULE__, :subscribe_local, [pid, id], 15_000)
×
109
    end
110
  end
111

112
  @spec get_global_sup(id) :: pid | nil
113
  def get_global_sup(id) do
114
    case :syn.whereis_name({:tenants, id}) do
882✔
115
      :undefined -> nil
193✔
116
      pid -> pid
689✔
117
    end
118
  end
119

120
  @doc """
121
  During netsplits, or due to certain internal conflicts, :syn may store inconsistent data across the cluster.
122
  This function terminates all connection trees related to a specific tenant.
123
  """
124
  @spec dirty_terminate(String.t(), pos_integer()) :: map()
125
  def dirty_terminate(tenant, timeout \\ 15_000) do
126
    Registry.lookup(Supavisor.Registry.TenantSups, tenant)
127
    |> Enum.reduce(%{}, fn {pid, %{user: user, mode: _mode}}, acc ->
4✔
128
      stop =
×
129
        try do
130
          Supervisor.stop(pid, :shutdown, timeout)
×
131
        catch
132
          error, reason -> {:error, {error, reason}}
×
133
        end
134

135
      resp = %{
×
136
        stop: stop,
137
        cache: del_all_cache(tenant, user)
138
      }
139

140
      Map.put(acc, user, resp)
×
141
    end)
142
  end
143

144
  def terminate_global(tenant) do
145
    :erpc.multicall([node() | Node.list()], Supavisor, :dirty_terminate, [tenant], 60_000)
4✔
146
  end
147

148
  @doc """
149
  Updates credentials for all SecretChecker processes for a tenant across the cluster.
150
  Used for auth_query mode (require_user: false) to hot-update credentials without restarting pools.
151
  """
152
  @spec update_secret_checker_credentials_global(String.t(), String.t(), (-> String.t())) :: [
153
          {node(), term()}
154
        ]
155
  def update_secret_checker_credentials_global(tenant, new_user, password_fn) do
156
    :erpc.multicall(
3✔
157
      [node() | Node.list()],
158
      Supavisor,
159
      :update_secret_checker_credentials_local,
160
      [tenant, new_user, password_fn],
161
      60_000
162
    )
163
  end
164

165
  @spec update_secret_checker_credentials_local(String.t(), String.t(), (-> String.t())) :: map()
166
  def update_secret_checker_credentials_local(tenant, new_user, password_fn) do
167
    Registry.lookup(Supavisor.Registry.TenantSups, tenant)
168
    |> Enum.reduce(%{}, fn {_pid,
3✔
169
                            %{
170
                              user: user,
171
                              mode: mode,
172
                              type: type,
173
                              db_name: db_name,
174
                              search_path: search_path
175
                            }},
176
                           acc ->
177
      id = {{type, tenant}, user, mode, db_name, search_path}
2✔
178
      result = Supavisor.SecretChecker.update_credentials(id, new_user, password_fn)
2✔
179
      Map.put(acc, {user, mode}, result)
2✔
180
    end)
181
  end
182

183
  @spec del_all_cache(String.t(), String.t()) :: [map()]
184
  def del_all_cache(tenant, user) do
185
    Logger.info("Deleting all cache for tenant #{tenant} and user #{user}")
×
186
    {:ok, keys} = Cachex.keys(Supavisor.Cache)
×
187

188
    del = fn key, acc ->
×
189
      result = Cachex.del(Supavisor.Cache, key)
×
190
      [%{inspect(key) => inspect(result)} | acc]
191
    end
192

193
    Enum.reduce(keys, [], fn
×
194
      {:metrics, ^tenant} = key, acc -> del.(key, acc)
×
195
      {:secrets_for_validation, ^tenant, ^user} = key, acc -> del.(key, acc)
×
196
      {:secrets_check, ^tenant, ^user} = key, acc -> del.(key, acc)
×
197
      {:user_cache, _, ^user, ^tenant, _} = key, acc -> del.(key, acc)
×
198
      {:tenant_cache, ^tenant, _} = key, acc -> del.(key, acc)
×
199
      {:pool_config_cache, ^tenant, ^user} = key, acc -> del.(key, acc)
×
200
      _, acc -> acc
×
201
    end)
202
  end
203

204
  @spec del_all_cache(String.t()) :: [map()]
205
  def del_all_cache(tenant) do
206
    Logger.info("Deleting all cache for tenant #{tenant}")
83✔
207

208
    del = fn key, acc ->
83✔
209
      result = Cachex.del(Supavisor.Cache, key)
46✔
210
      [%{inspect(key) => inspect(result)} | acc]
211
    end
212

213
    :ets.foldl(
83✔
214
      fn
215
        {:entry, key, _, _, _result}, acc ->
216
          case key do
2,103✔
217
            {:metrics, ^tenant} -> del.(key, acc)
×
218
            {:secrets_for_validation, ^tenant, _} -> del.(key, acc)
9✔
219
            {:secrets_check, ^tenant, _} -> del.(key, acc)
×
220
            {:user_cache, _, _, ^tenant, _} -> del.(key, acc)
15✔
221
            {:tenant_cache, ^tenant, _} -> del.(key, acc)
13✔
222
            {:pool_config_cache, ^tenant, _} -> del.(key, acc)
9✔
223
            _ -> acc
2,057✔
224
          end
225

226
        other, acc ->
227
          Logger.error("Unknown key: #{inspect(other)}")
×
228
          acc
×
229
      end,
230
      [],
231
      Supavisor.Cache
232
    )
233
  end
234

235
  @spec del_all_cache_dist(String.t(), pos_integer()) :: [map()]
236
  def del_all_cache_dist(tenant, timeout \\ 15_000) do
237
    Logger.info("Deleting all dist cache for tenant #{tenant}")
83✔
238

239
    for node <- [node() | Node.list()] do
83✔
240
      %{to_string(node) => :erpc.call(node, Supavisor, :del_all_cache, [tenant], timeout)}
83✔
241
    end
242
  end
243

244
  @spec get_local_pool(id) :: map | pid | nil
245
  def get_local_pool(id) do
246
    match = {{:pool, :_, :_, id}, :"$2", :"$3"}
598✔
247
    body = [{{:"$2", :"$3"}}]
598✔
248

249
    case Registry.select(@registry, [{match, [], body}]) do
598✔
250
      [{pool, _}] ->
251
        pool
597✔
252

253
      [_ | _] = pools ->
254
        # transform [{pid1, :read}, {pid2, :read}, {pid3, :write}]
255
        # to %{read: [pid1, pid2], write: [pid3]}
256
        Enum.group_by(pools, &elem(&1, 1), &elem(&1, 0))
×
257

258
      _ ->
1✔
259
        nil
260
    end
261
  end
262

263
  @spec get_local_manager(id) :: pid | nil
264
  def get_local_manager(id) do
265
    case Registry.lookup(@registry, {:manager, id}) do
1,324✔
266
      [{pid, _}] -> pid
1,324✔
UNCOV
267
      _ -> nil
×
268
    end
269
  end
270

271
  @spec id({:single | :cluster, String.t()}, String.t(), mode, mode, String.t(), String.t() | nil) ::
272
          id
273
  def id(tenant, user, port_mode, user_mode, db_name, search_path) do
274
    # temporary hack
275
    mode =
621✔
276
      if port_mode == :transaction do
277
        user_mode
367✔
278
      else
279
        port_mode
254✔
280
      end
281

282
    {tenant, user, mode, db_name, search_path}
621✔
283
  end
284

285
  @spec tenant(id) :: String.t()
286
  def tenant({{_, tenant}, _, _, _, _}), do: tenant
120✔
287

288
  @spec mode(id) :: atom()
289
  def mode({_, _, mode, _, _}), do: mode
×
290

291
  @spec search_path(id) :: String.t() | nil
292
  def search_path({_, _, _, _, search_path}), do: search_path
337✔
293

294
  @spec determine_node(id, String.t() | nil) :: Node.t()
295
  def determine_node(id, availability_zone) do
296
    tenant_id = tenant(id)
59✔
297

298
    # If the AWS zone group is empty, we will use all nodes.
299
    # If the AWS zone group exists with the same zone, we will use nodes from this group.
300
    #   :syn.members(:availability_zone, "1c")
301
    #   [{#PID<0.381.0>, [node: :"node1@127.0.0.1"]}]
302
    nodes =
59✔
303
      with zone when is_binary(zone) <- availability_zone,
59✔
304
           zone_nodes when zone_nodes != [] <- :syn.members(:availability_zone, zone) do
×
305
        zone_nodes
306
        |> Enum.map(fn {_, [node: node]} -> node end)
×
307
      else
308
        _ -> [node() | Node.list()]
309
      end
310

311
    index = :erlang.phash2(tenant_id, length(nodes))
59✔
312

313
    nodes
314
    |> Enum.sort()
315
    |> Enum.at(index)
59✔
316
  end
317

318
  @spec try_start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
319
  def try_start_local_pool(id, secrets, log_level) do
320
    if count_pools(tenant(id)) < @max_pools,
60✔
321
      do: start_local_pool(id, secrets, log_level),
60✔
322
      else: {:error, :max_pools_reached}
323
  end
324

325
  @spec start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
326
  def start_local_pool(
327
        {{type, tenant}, _user, _mode, _db_name, _search_path} = id,
328
        secrets,
329
        log_level \\ nil
×
330
      ) do
331
    Logger.info("Starting pool(s) for #{inspect(id)}")
60✔
332

333
    secrets_map = elem(secrets, 1).()
60✔
334
    user = secrets_map.user
60✔
335

336
    case type do
337
      :single -> Tenants.get_pool_config_cache(tenant, user)
60✔
338
      :cluster -> Tenants.get_cluster_config(tenant, user)
×
339
    end
340
    |> case do
60✔
341
      [_ | _] = replicas ->
342
        # Extract only minimal info needed for pool creation
343
        replicas_info =
60✔
344
          Enum.map(replicas, fn replica ->
345
            case replica do
60✔
346
              %Tenants.ClusterTenants{tenant: tenant, type: type} ->
347
                first_user = List.first(tenant.users)
×
348

349
                %{
×
350
                  replica_type: type,
351
                  pool_size:
352
                    if(first_user, do: first_user.pool_size, else: tenant.default_pool_size)
×
353
                }
354

355
              %Tenants.Tenant{} = tenant ->
356
                first_user = List.first(tenant.users)
60✔
357

358
                %{
60✔
359
                  replica_type: :write,
360
                  pool_size:
361
                    if(first_user, do: first_user.pool_size, else: tenant.default_pool_size)
60✔
362
                }
363
            end
364
          end)
365

366
        DynamicSupervisor.start_child(
367
          {:via, PartitionSupervisor, {Supavisor.DynamicSupervisor, id}},
368
          {Supavisor.TenantSupervisor,
369
           %{id: id, replicas: replicas_info, secrets: secrets, log_level: log_level}}
370
        )
371
        |> case do
60✔
372
          {:error, {:already_started, pid}} -> {:ok, pid}
6✔
373
          resp -> resp
54✔
374
        end
375

376
      error ->
377
        Logger.error("Can't find tenant with external_id #{inspect(id)} #{inspect(error)}")
×
378

379
        {:error, :tenant_not_found}
380
    end
381
  end
382

383
  ## Internal functions
384

385
  @spec set_parameter_status(id, [{binary, binary}]) ::
386
          :ok | {:error, :not_found}
387
  def set_parameter_status(id, ps) do
388
    case get_local_manager(id) do
323✔
389
      nil -> {:error, :not_found}
×
390
      pid -> Manager.set_parameter_status(pid, ps)
323✔
391
    end
392
  end
393

394
  @spec get_pool_ranch(id) :: {:ok, map()} | {:error, :not_found}
395
  def get_pool_ranch(id) do
396
    case :syn.lookup(:tenants, id) do
×
397
      {_sup_pid, %{port: _port, host: _host} = meta} -> {:ok, meta}
×
398
      _ -> {:error, :not_found}
×
399
    end
400
  end
401

402
  @spec get_local_server(id) :: map()
403
  def get_local_server({_, _, mode, _, _} = id) do
404
    host = Application.get_env(:supavisor, :node_host)
60✔
405

406
    ports =
60✔
407
      case mode do
408
        :session -> Application.fetch_env!(:supavisor, :session_proxy_ports)
15✔
409
        :transaction -> Application.fetch_env!(:supavisor, :transaction_proxy_ports)
45✔
410
      end
411

412
    shard = :erlang.phash2(id, length(ports))
60✔
413
    %{host: host, port: :ranch.get_port({:pg_proxy_internal, mode, shard})}
60✔
414
  end
415

416
  @spec count_pools(String.t()) :: non_neg_integer()
417
  def count_pools(tenant),
418
    do: Registry.count_match(Supavisor.Registry.TenantSups, tenant, :_)
60✔
419
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc