• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / e5e7ebfe80dbec4965226225050d4ef5c8216e88-PR-605

21 Feb 2025 02:35PM UTC coverage: 45.973% (-0.03%) from 46.003%
e5e7ebfe80dbec4965226225050d4ef5c8216e88-PR-605

Pull #605

github

hauleth
fix: remaining SSL connections that need to set `verify_none` option
Pull Request #605: fix: remaining SSL connections that need to set `verify_none` option

2 of 9 new or added lines in 3 files covered. (22.22%)

267 existing lines in 26 files now uncovered.

959 of 2086 relevant lines covered (45.97%)

635.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.23
/lib/supavisor.ex
1
defmodule Supavisor do
2
  @moduledoc false
3

4
  require Logger
5

6
  alias Supavisor.{
7
    Helpers,
8
    Manager,
9
    Tenants
10
  }
11

12
  @type sock :: tcp_sock() | ssl_sock()
13
  @type ssl_sock :: {:ssl, :ssl.sslsocket()}
14
  @type tcp_sock :: {:gen_tcp, :gen_tcp.socket()}
15
  @type workers :: %{manager: pid, pool: pid}
16
  @type secrets :: {:password | :auth_query, fun()}
17
  @type mode :: :transaction | :session | :native | :proxy
18
  @type id :: {{:single | :cluster, String.t()}, String.t(), mode, String.t(), String.t() | nil}
19
  @type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer}
20

21
  @registry Supavisor.Registry.Tenants
22
  @max_pools Application.compile_env(:supavisor, :max_pools, 20)
23

24
  @spec start_dist(id, secrets, keyword()) :: {:ok, pid()} | {:error, any()}
25
  def start_dist(id, secrets, options \\ []) do
26
    options =
173✔
27
      Keyword.validate!(options, log_level: nil, force_node: false, availability_zone: nil)
28

29
    log_level = Keyword.fetch!(options, :log_level)
173✔
30
    force_node = Keyword.fetch!(options, :force_node)
173✔
31
    availability_zone = Keyword.fetch!(options, :availability_zone)
173✔
32

33
    case get_global_sup(id) do
173✔
34
      nil ->
35
        node = if force_node, do: force_node, else: determine_node(id, availability_zone)
19✔
36

37
        if node == node() do
19✔
38
          Logger.debug("Starting local pool for #{inspect(id)}")
19✔
39
          try_start_local_pool(id, secrets, log_level)
19✔
40
        else
41
          Logger.debug("Starting remote pool for #{inspect(id)}")
×
42
          Helpers.rpc(node, __MODULE__, :try_start_local_pool, [id, secrets, log_level])
×
43
        end
44

45
      pid ->
154✔
46
        {:ok, pid}
47
    end
48
  end
49

50
  @spec start(id, secrets) :: {:ok, pid} | {:error, any}
51
  def start(id, secrets) do
UNCOV
52
    case get_global_sup(id) do
1✔
53
      nil ->
UNCOV
54
        try_start_local_pool(id, secrets, nil)
1✔
55

56
      pid ->
×
57
        {:ok, pid}
58
    end
59
  end
60

61
  @spec stop(id) :: :ok | {:error, :tenant_not_found}
62
  def stop(id) do
63
    case get_global_sup(id) do
12✔
UNCOV
64
      nil -> {:error, :tenant_not_found}
1✔
65
      pid -> Supervisor.stop(pid)
11✔
66
    end
67
  end
68

69
  @spec get_local_workers(id) :: {:ok, workers} | {:error, :worker_not_found}
70
  def get_local_workers(id) do
71
    workers = %{
173✔
72
      manager: get_local_manager(id),
73
      pool: get_local_pool(id)
74
    }
75

76
    if Map.values(workers) |> Enum.member?(nil) do
173✔
77
      Logger.error("Could not get workers for tenant #{inspect(id)}")
×
78
      {:error, :worker_not_found}
79
    else
80
      {:ok, workers}
81
    end
82
  end
83

84
  @spec subscribe_local(pid, id) :: {:ok, subscribe_opts} | {:error, any()}
85
  def subscribe_local(pid, id) do
86
    with {:ok, workers} <- get_local_workers(id),
173✔
87
         {:ok, ps, idle_timeout} <- Manager.subscribe(workers.manager, pid) do
173✔
88
      {:ok, %{workers: workers, ps: ps, idle_timeout: idle_timeout}}
89
    end
90
  end
91

92
  @spec subscribe(pid, id, pid) :: {:ok, subscribe_opts} | {:error, any()}
93
  def subscribe(sup, id, pid \\ self()) do
94
    dest_node = node(sup)
173✔
95

96
    if node() == dest_node do
173✔
97
      subscribe_local(pid, id)
173✔
98
    else
99
      Helpers.rpc(dest_node, __MODULE__, :subscribe_local, [pid, id], 15_000)
×
100
    end
101
  end
102

103
  @spec get_global_sup(id) :: pid | nil
104
  def get_global_sup(id) do
105
    case :syn.whereis_name({:tenants, id}) do
220✔
106
      :undefined -> nil
52✔
107
      pid -> pid
168✔
108
    end
109
  end
110

111
  @doc """
112
  During netsplits, or due to certain internal conflicts, :syn may store inconsistent data across the cluster.
113
  This function terminates all connection trees related to a specific tenant.
114
  """
115
  @spec dirty_terminate(String.t(), pos_integer()) :: map()
116
  def dirty_terminate(tenant, timeout \\ 15_000) do
117
    Registry.lookup(Supavisor.Registry.TenantSups, tenant)
UNCOV
118
    |> Enum.reduce(%{}, fn {pid, %{user: user, mode: _mode}}, acc ->
2✔
119
      stop =
×
120
        try do
121
          Supervisor.stop(pid, :shutdown, timeout)
×
122
        catch
123
          error, reason -> {:error, {error, reason}}
×
124
        end
125

126
      resp = %{
×
127
        stop: stop,
128
        cache: del_all_cache(tenant, user)
129
      }
130

131
      Map.put(acc, user, resp)
×
132
    end)
133
  end
134

135
  def terminate_global(tenant) do
UNCOV
136
    :erpc.multicall([node() | Node.list()], Supavisor, :dirty_terminate, [tenant], 60_000)
2✔
137
  end
138

139
  @spec del_all_cache(String.t(), String.t()) :: [map()]
140
  def del_all_cache(tenant, user) do
141
    Logger.info("Deleting all cache for tenant #{tenant} and user #{user}")
×
142
    {:ok, keys} = Cachex.keys(Supavisor.Cache)
×
143

144
    del = fn key, acc ->
×
145
      result = Cachex.del(Supavisor.Cache, key)
×
146
      [%{inspect(key) => inspect(result)} | acc]
147
    end
148

149
    Enum.reduce(keys, [], fn
×
150
      {:metrics, ^tenant} = key, acc -> del.(key, acc)
×
151
      {:secrets, ^tenant, ^user} = key, acc -> del.(key, acc)
×
152
      {:user_cache, _, ^user, ^tenant, _} = key, acc -> del.(key, acc)
×
153
      {:tenant_cache, ^tenant, _} = key, acc -> del.(key, acc)
×
154
      {:pool_config_cache, ^tenant, ^user} = key, acc -> del.(key, acc)
×
155
      _, acc -> acc
×
156
    end)
157
  end
158

159
  @spec del_all_cache(String.t()) :: [map()]
160
  def del_all_cache(tenant) do
UNCOV
161
    Logger.info("Deleting all cache for tenant #{tenant}")
6✔
162

UNCOV
163
    del = fn key, acc ->
6✔
UNCOV
164
      result = Cachex.del(Supavisor.Cache, key)
4✔
165
      [%{inspect(key) => inspect(result)} | acc]
166
    end
167

UNCOV
168
    :ets.foldl(
6✔
169
      fn
170
        {:entry, key, _, _, _result}, acc ->
UNCOV
171
          case key do
28✔
172
            {:metrics, ^tenant} -> del.(key, acc)
×
173
            {:secrets, ^tenant, _} -> del.(key, acc)
×
UNCOV
174
            {:user_cache, _, _, ^tenant, _} -> del.(key, acc)
2✔
UNCOV
175
            {:tenant_cache, ^tenant, _} -> del.(key, acc)
2✔
176
            {:pool_config_cache, ^tenant, _} -> del.(key, acc)
×
UNCOV
177
            _ -> acc
24✔
178
          end
179

180
        other, acc ->
181
          Logger.error("Unknown key: #{inspect(other)}")
×
182
          acc
×
183
      end,
184
      [],
185
      Supavisor.Cache
186
    )
187
  end
188

189
  @spec del_all_cache_dist(String.t(), pos_integer()) :: [map()]
190
  def del_all_cache_dist(tenant, timeout \\ 15_000) do
UNCOV
191
    Logger.info("Deleting all dist cache for tenant #{tenant}")
6✔
192

UNCOV
193
    for node <- [node() | Node.list()] do
6✔
UNCOV
194
      %{to_string(node) => :erpc.call(node, Supavisor, :del_all_cache, [tenant], timeout)}
6✔
195
    end
196
  end
197

198
  @spec get_local_pool(id) :: map | pid | nil
199
  def get_local_pool(id) do
200
    match = {{:pool, :_, :_, id}, :"$2", :"$3"}
173✔
201
    body = [{{:"$2", :"$3"}}]
173✔
202

203
    case Registry.select(@registry, [{match, [], body}]) do
173✔
204
      [{pool, _}] ->
205
        pool
173✔
206

207
      [_ | _] = pools ->
208
        # transform [{pid1, :read}, {pid2, :read}, {pid3, :write}]
209
        # to %{read: [pid1, pid2], write: [pid3]}
210
        Enum.group_by(pools, &elem(&1, 1), &elem(&1, 0))
×
211

212
      _ ->
×
213
        nil
214
    end
215
  end
216

217
  @spec get_local_manager(id) :: pid | nil
218
  def get_local_manager(id) do
219
    case Registry.lookup(@registry, {:manager, id}) do
331✔
220
      [{pid, _}] -> pid
331✔
221
      _ -> nil
×
222
    end
223
  end
224

225
  @spec id({:single | :cluster, String.t()}, String.t(), mode, mode, String.t(), String.t() | nil) ::
226
          id
227
  def id(tenant, user, port_mode, user_mode, db_name, search_path) do
228
    # temporary hack
229
    mode =
177✔
230
      if port_mode == :transaction do
231
        user_mode
100✔
232
      else
233
        port_mode
77✔
234
      end
235

236
    {tenant, user, mode, db_name, search_path}
177✔
237
  end
238

239
  @spec tenant(id) :: String.t()
240
  def tenant({{_, tenant}, _, _, _, _}), do: tenant
59✔
241

242
  @spec mode(id) :: atom()
243
  def mode({_, _, mode, _, _}), do: mode
181✔
244

245
  @spec search_path(id) :: String.t() | nil
246
  def search_path({_, _, _, _, search_path}), do: search_path
161✔
247

248
  @spec determine_node(id, String.t() | nil) :: Node.t()
249
  def determine_node(id, availability_zone) do
250
    tenant_id = tenant(id)
19✔
251

252
    # If the AWS zone group is empty, we will use all nodes.
253
    # If the AWS zone group exists with the same zone, we will use nodes from this group.
254
    #   :syn.members(:availability_zone, "1c")
255
    #   [{#PID<0.381.0>, [node: :"node1@127.0.0.1"]}]
256
    nodes =
19✔
257
      with zone when is_binary(zone) <- availability_zone,
19✔
258
           zone_nodes when zone_nodes != [] <- :syn.members(:availability_zone, zone) do
×
259
        zone_nodes
260
        |> Enum.map(fn {_, [node: node]} -> node end)
×
261
      else
262
        _ -> [node() | Node.list()]
263
      end
264

265
    index = :erlang.phash2(tenant_id, length(nodes))
19✔
266

267
    nodes
268
    |> Enum.sort()
269
    |> Enum.at(index)
19✔
270
  end
271

272
  @spec try_start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
273
  def try_start_local_pool(id, secrets, log_level) do
274
    if count_pools(tenant(id)) < @max_pools,
20✔
275
      do: start_local_pool(id, secrets, log_level),
20✔
276
      else: {:error, :max_pools_reached}
277
  end
278

279
  @spec start_local_pool(id, secrets, atom()) :: {:ok, pid} | {:error, any}
280
  def start_local_pool(
281
        {{type, tenant}, _user, _mode, _db_name, _search_path} = id,
282
        secrets,
283
        log_level \\ nil
×
284
      ) do
285
    Logger.info("Starting pool(s) for #{inspect(id)}")
20✔
286

287
    user = elem(secrets, 1).().alias
20✔
288

289
    case type do
290
      :single -> Tenants.get_pool_config_cache(tenant, user)
20✔
291
      :cluster -> Tenants.get_cluster_config(tenant, user)
×
292
    end
293
    |> case do
20✔
294
      [_ | _] = replicas ->
295
        opts =
20✔
296
          Enum.map(replicas, fn replica ->
297
            case replica do
298
              %Tenants.ClusterTenants{tenant: tenant, type: type} ->
299
                Map.put(tenant, :replica_type, type)
×
300

301
              %Tenants.Tenant{} = tenant ->
302
                Map.put(tenant, :replica_type, :write)
20✔
303
            end
304
            |> supervisor_args(id, secrets, log_level)
20✔
305
          end)
306

307
        DynamicSupervisor.start_child(
308
          {:via, PartitionSupervisor, {Supavisor.DynamicSupervisor, id}},
309
          {Supavisor.TenantSupervisor, %{id: id, replicas: opts, log_level: log_level}}
310
        )
311
        |> case do
20✔
312
          {:error, {:already_started, pid}} -> {:ok, pid}
×
313
          resp -> resp
20✔
314
        end
315

316
      error ->
317
        Logger.error("Can't find tenant with external_id #{inspect(id)} #{inspect(error)}")
×
318

319
        {:error, :tenant_not_found}
320
    end
321
  end
322

323
  ## Internal functions
324

325
  defp supervisor_args(
326
         tenant_record,
327
         {tenant, user, mode, db_name, _search_path} = id,
328
         {method, secrets},
329
         log_level
330
       ) do
331
    %{
332
      db_host: db_host,
333
      db_port: db_port,
334
      db_database: db_database,
335
      auth_query: auth_query,
336
      default_parameter_status: ps,
337
      ip_version: ip_ver,
338
      default_pool_size: def_pool_size,
339
      default_max_clients: def_max_clients,
340
      client_idle_timeout: client_idle_timeout,
341
      replica_type: replica_type,
342
      sni_hostname: sni_hostname,
343
      users: [
344
        %{
345
          db_user: db_user,
346
          db_password: db_pass,
347
          pool_size: pool_size,
348
          db_user_alias: alias,
349
          # mode_type: mode_type,
350
          max_clients: max_clients
351
        }
352
      ]
353
    } = tenant_record
20✔
354

355
    {pool_size, max_clients} =
20✔
356
      if method == :auth_query do
20✔
357
        {def_pool_size, def_max_clients}
358
      else
359
        {pool_size, max_clients}
360
      end
361

362
    auth = %{
20✔
363
      host: String.to_charlist(db_host),
364
      sni_hostname: if(sni_hostname != nil, do: to_charlist(sni_hostname)),
20✔
365
      port: db_port,
366
      user: db_user,
367
      alias: alias,
368
      auth_query: auth_query,
369
      database: if(db_name != nil, do: db_name, else: db_database),
20✔
370
      password: fn -> db_pass end,
20✔
371
      application_name: "Supavisor",
372
      ip_version: Helpers.ip_version(ip_ver, db_host),
373
      upstream_ssl: tenant_record.upstream_ssl,
20✔
374
      upstream_verify: tenant_record.upstream_verify,
20✔
375
      upstream_tls_ca: Helpers.upstream_cert(tenant_record.upstream_tls_ca),
20✔
376
      require_user: tenant_record.require_user,
20✔
377
      method: method,
378
      secrets: secrets
379
    }
380

381
    %{
20✔
382
      id: id,
383
      tenant: tenant,
384
      replica_type: replica_type,
385
      user: user,
386
      auth: auth,
387
      pool_size: pool_size,
388
      mode: mode,
389
      default_parameter_status: ps,
390
      max_clients: max_clients,
391
      client_idle_timeout: client_idle_timeout,
392
      log_level: log_level
393
    }
394
  end
395

396
  @spec set_parameter_status(id, [{binary, binary}]) ::
397
          :ok | {:error, :not_found}
398
  def set_parameter_status(id, ps) do
399
    case get_local_manager(id) do
157✔
400
      nil -> {:error, :not_found}
×
401
      pid -> Manager.set_parameter_status(pid, ps)
157✔
402
    end
403
  end
404

405
  @spec get_pool_ranch(id) :: {:ok, map()} | {:error, :not_found}
406
  def get_pool_ranch(id) do
407
    case :syn.lookup(:tenants, id) do
×
408
      {_sup_pid, %{port: _port, host: _host} = meta} -> {:ok, meta}
×
409
      _ -> {:error, :not_found}
×
410
    end
411
  end
412

413
  @spec start_local_server(map()) :: {:ok, map()} | {:error, any()}
414
  def start_local_server(%{max_clients: max_clients} = args) do
415
    # max_clients=-1 is used for testing the maximum allowed clients in ProxyTest
416
    {acceptors, max_clients} =
20✔
417
      if max_clients > 0,
20✔
418
        do: {ceil(max_clients / 100), max_clients},
419
        else: {1, 100}
420

421
    opts = %{
20✔
422
      max_connections: max_clients * Application.get_env(:supavisor, :local_proxy_multiplier),
423
      num_acceptors: max(acceptors, 10),
424
      socket_opts: [port: 0, keepalive: true]
425
    }
426

427
    handler = Supavisor.ClientHandler
20✔
428
    args = Map.put(args, :local, true)
20✔
429

430
    with {:ok, pid} <- :ranch.start_listener(args.id, :ranch_tcp, opts, handler, args) do
20✔
431
      host = Application.get_env(:supavisor, :node_host)
20✔
432
      {:ok, %{listener: pid, host: host, port: :ranch.get_port(args.id)}}
20✔
433
    end
434
  end
435

436
  @spec count_pools(String.t()) :: non_neg_integer()
437
  def count_pools(tenant),
438
    do: Registry.count_match(Supavisor.Registry.TenantSups, tenant, :_)
20✔
439
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc