• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.64
/lib/supavisor/manager.ex
1
defmodule Supavisor.Manager do
2
  @moduledoc """
3
  The Manager is responsible for managing the config and parameter status for a pool
4
  """
5

6
  use GenServer, restart: :transient
7
  require Logger
8

9
  alias Supavisor.Protocol.Server
10
  alias Supavisor.Tenants
11
  alias Supavisor.Helpers
12

13
  @check_timeout 120_000
14

15
  @doc """
16
  Starts the pool manager
17
  """
18
  def start_link(args) do
19
    name = {:via, Registry, {Supavisor.Registry.Tenants, {:manager, args.id}}}
34✔
20

21
    GenServer.start_link(__MODULE__, args, name: name)
34✔
22
  end
23

24
  @doc """
25
  Subscribes to a pool
26

27
  Upon subscription, a client "joins" a pool. From this point, it's an active connection and
28
  it may receive updates destined to the pool.
29
  """
30
  @spec subscribe(pid, pid) ::
31
          {:ok, iodata() | [], integer}
32
          | {:error, :max_clients_reached}
33
          | {:error, :terminating, term()}
34
  def subscribe(manager, pid) do
35
    GenServer.call(manager, {:subscribe, pid})
602✔
36
  end
37

38
  @doc """
39
  Updates parameter status for the pool
40

41
  Sends the parameter status update to all subscribed client handlers.
42
  """
43
  @spec set_parameter_status(pid, map) :: :ok
44
  def set_parameter_status(manager, ps) do
45
    GenServer.call(manager, {:set_parameter_status, ps})
293✔
46
  end
47

48
  @doc """
49
  Get the current parameter status for a pool
50
  """
51
  @spec get_parameter_status(pid) :: iodata() | []
52
  def get_parameter_status(manager) do
53
    GenServer.call(manager, :get_parameter_status)
×
54
  end
55

56
  @doc """
57
  Get the current config for a pool
58
  """
59
  @spec get_config(pid | Supavisor.id()) :: map()
60
  def get_config(manager_or_id) do
61
    manager = resolve_manager(manager_or_id)
305✔
62
    GenServer.call(manager, :get_config)
305✔
63
  end
64

65
  @doc """
66
  Terminates the pool and notifies all subscribed clients
67
  """
68
  @spec terminate_pool(pid | Supavisor.id(), map()) :: :ok
69
  def terminate_pool(manager_or_id, error) do
70
    manager = resolve_manager(manager_or_id)
2✔
71
    GenServer.cast(manager, {:terminate_pool, error})
2✔
72
  end
73

74
  # Helper to resolve manager PID from either PID or ID
NEW
75
  defp resolve_manager(pid) when is_pid(pid), do: pid
×
76

77
  defp resolve_manager(id) do
78
    case Supavisor.get_local_manager(id) do
307✔
NEW
79
      nil -> raise "Manager not found for pool #{inspect(id)}"
×
80
      pid -> pid
307✔
81
    end
82
  end
83

84
  ## Callbacks
85

86
  @impl true
87
  def init(args) do
88
    Helpers.set_log_level(args.log_level)
34✔
89
    tid = :ets.new(__MODULE__, [:protected])
34✔
90

91
    {{type, tenant}, user, mode, db_name, _search_path} = args.id
34✔
92
    {method, secrets} = args.secrets
34✔
93

94
    {tenant_record, replica_type} =
34✔
95
      case type do
96
        :single ->
97
          tenants = Tenants.get_pool_config_cache(tenant, user)
34✔
98
          {List.first(tenants), :write}
99

100
        :cluster ->
NEW
101
          case Tenants.get_cluster_config(tenant, user) do
×
102
            {:error, reason} ->
NEW
103
              raise "Failed to get cluster config: #{inspect(reason)}"
×
104

105
            cluster_tenants ->
NEW
106
              selected =
×
NEW
107
                Enum.find(cluster_tenants, fn ct -> ct.type == :write end) ||
×
NEW
108
                  List.first(cluster_tenants)
×
109

NEW
110
              case selected do
×
NEW
111
                %Tenants.ClusterTenants{tenant: t, type: rt} -> {t, rt}
×
NEW
112
                nil -> raise "No cluster tenant configuration found"
×
113
              end
114
          end
115
      end
116

117
    %{
118
      db_host: db_host,
119
      db_port: db_port,
120
      db_database: db_database,
121
      auth_query: auth_query,
122
      default_parameter_status: ps,
123
      ip_version: ip_ver,
124
      default_pool_size: default_pool_size,
125
      default_max_clients: default_max_clients,
126
      client_idle_timeout: client_idle_timeout,
127
      sni_hostname: sni_hostname,
128
      feature_flags: feature_flags
129
    } = tenant_record
34✔
130

131
    user_config = List.first(tenant_record.users)
34✔
132
    pool_size = (user_config && user_config.pool_size) || default_pool_size
34✔
133
    max_clients = (user_config && user_config.max_clients) || default_max_clients
34✔
134

135
    auth = %{
34✔
136
      host: String.to_charlist(db_host),
137
      sni_hostname: if(sni_hostname != nil, do: to_charlist(sni_hostname)),
34✔
138
      port: db_port,
139
      user: user,
140
      auth_query: auth_query,
141
      database: if(db_name != nil, do: db_name, else: db_database),
34✔
142
      application_name: "Supavisor",
143
      ip_version: Helpers.ip_version(ip_ver, db_host),
144
      upstream_ssl: tenant_record.upstream_ssl,
34✔
145
      upstream_verify: tenant_record.upstream_verify,
34✔
146
      upstream_tls_ca: Helpers.upstream_cert(tenant_record.upstream_tls_ca),
34✔
147
      require_user: tenant_record.require_user,
34✔
148
      method: method,
149
      secrets: args.secrets
34✔
150
    }
151

152
    secrets_struct = secrets.()
34✔
153

154
    alias Supavisor.ClientHandler.Auth
155

156
    if method == :password or match?(%Auth.SASLSecrets{}, secrets_struct) do
34✔
157
      Supavisor.SecretCache.put_both(tenant, user, method, secrets, :infinity)
34✔
158
    else
NEW
159
      Supavisor.SecretCache.put_validation_secrets_if_missing(tenant, user, method, secrets)
×
160
    end
161

162
    state = %{
34✔
163
      id: args.id,
34✔
164
      check_ref: check_subscribers(),
165
      tid: tid,
166
      tenant: tenant,
167
      parameter_status: [],
168
      wait_ps: [],
169
      default_parameter_status: ps,
170
      max_clients: max_clients,
171
      idle_timeout: client_idle_timeout,
172
      auth: auth,
173
      mode: mode,
174
      replica_type: replica_type,
175
      pool_size: pool_size,
176
      log_level: args.log_level,
34✔
177
      tenant_feature_flags: feature_flags,
178
      terminating_error: nil
179
    }
180

181
    Logger.metadata(project: tenant, user: user, type: type, db_name: db_name)
34✔
182
    Registry.register(Supavisor.Registry.ManagerTables, args.id, tid)
34✔
183

184
    {:ok, state}
185
  end
186

187
  @impl true
188
  def handle_call({:subscribe, _pid}, _, %{terminating_error: error} = state)
189
      when not is_nil(error) do
NEW
190
    Logger.warning("Rejecting subscription to terminating pool #{inspect(state.id)}")
×
NEW
191
    {:reply, {:error, :terminating, error}, state}
×
192
  end
193

194
  def handle_call({:subscribe, pid}, _, state) do
195
    Logger.debug("Subscribing #{inspect(pid)} to tenant #{inspect(state.id)}")
602✔
196

197
    {reply, new_state} =
602✔
198
      if :ets.info(state.tid, :size) < state.max_clients or Supavisor.mode(state.id) == :session do
602✔
199
        :ets.insert(state.tid, {Process.monitor(pid), pid, now()})
601✔
200

201
        case state.parameter_status do
601✔
202
          [] ->
38✔
203
            {{:ok, [], state.idle_timeout}, update_in(state.wait_ps, &[pid | &1])}
38✔
204

205
          ps ->
563✔
206
            {{:ok, ps, state.idle_timeout}, state}
563✔
207
        end
208
      else
209
        {{:error, :max_clients_reached}, state}
210
      end
211

212
    {:reply, reply, new_state}
602✔
213
  end
214

215
  def handle_call({:set_parameter_status, ps}, _, %{parameter_status: []} = state) do
216
    encoded_ps = Server.encode_parameter_status(ps)
32✔
217
    maybe_update_parameter_status(state.tenant, ps, state.default_parameter_status)
32✔
218

219
    for pid <- state.wait_ps do
32✔
220
      send(pid, {:parameter_status, encoded_ps})
37✔
221
    end
222

223
    {:reply, :ok, %{state | parameter_status: encoded_ps, wait_ps: []}}
32✔
224
  end
225

226
  def handle_call({:set_parameter_status, _ps}, _, state) do
227
    {:reply, :ok, state}
261✔
228
  end
229

230
  def handle_call(:get_config, _, state) do
231
    config = %{
304✔
232
      id: state.id,
304✔
233
      auth: state.auth,
304✔
234
      user: elem(state.id, 1),
304✔
235
      tenant: {:single, state.tenant},
304✔
236
      mode: state.mode,
304✔
237
      replica_type: state.replica_type,
304✔
238
      log_level: state.log_level,
304✔
239
      tenant_feature_flags: state.tenant_feature_flags
304✔
240
    }
241

242
    {:reply, config, state}
304✔
243
  end
244

245
  def handle_call(:get_parameter_status, _, state) do
NEW
246
    {:reply, state.parameter_status, state}
×
247
  end
248

249
  @impl true
250
  def handle_cast({:terminate_pool, error}, state) do
251
    Logger.warning("Terminating pool #{inspect(state.id)} due to error: #{inspect(error)}")
2✔
252

253
    client_pids = :ets.tab2list(state.tid) |> Enum.map(fn {_ref, pid, _time} -> pid end)
2✔
254

255
    error_message = Server.encode_error_message(error)
2✔
256

257
    for pid <- client_pids do
2✔
258
      Supavisor.ClientHandler.send_error_and_terminate(pid, error_message)
1✔
259
    end
260

261
    # Use a task to stop the pool supervisor to avoid deadlock, since the Manager
262
    # is a child of the TenantSupervisor that Supavisor.stop/1 will terminate
263
    Task.Supervisor.start_child(Supavisor.PoolTerminator, fn ->
2✔
264
      Supavisor.stop(state.id)
2✔
265
    end)
266

267
    {:noreply, %{state | terminating_error: error}}
268
  end
269

270
  @impl true
271
  def handle_info({:DOWN, ref, _, _, _}, state) do
272
    Process.cancel_timer(state.check_ref)
601✔
273
    :ets.take(state.tid, ref)
601✔
274
    {:noreply, %{state | check_ref: check_subscribers()}}
275
  end
276

277
  def handle_info(:check_subscribers, state) do
278
    Process.cancel_timer(state.check_ref)
×
279

280
    if :ets.info(state.tid, :size) == 0 do
×
281
      Logger.info("No subscribers for pool #{inspect(state.id)}, shutting down")
×
282
      Supavisor.stop(state.id)
×
283
      {:stop, :normal}
284
    else
285
      {:noreply, %{state | check_ref: check_subscribers()}}
286
    end
287
  end
288

289
  def handle_info(msg, state) do
290
    Logger.warning("Undefined msg: #{inspect(msg, pretty: true)}")
×
291
    {:noreply, state}
292
  end
293

294
  @impl true
295
  def terminate(_reason, state) do
NEW
296
    {{_type, tenant}, user, _mode, _db, _search} = state.id
×
NEW
297
    Supavisor.SecretCache.clean_upstream_secrets(tenant, user)
×
298
    :ok
299
  end
300

301
  ## Internal functions
302

303
  defp check_subscribers do
304
    Process.send_after(
635✔
305
      self(),
306
      :check_subscribers,
307
      @check_timeout
308
    )
309
  end
310

311
  defp now do
312
    System.system_time(:second)
601✔
313
  end
314

315
  @spec maybe_update_parameter_status(binary, map, map) :: :ok
316
  defp maybe_update_parameter_status(tenant, parameter_status, default_parameter_status) do
317
    parameter_status
318
    |> Enum.filter(fn {key, new_value} ->
319
      case default_parameter_status do
416✔
320
        %{^key => value} when value != new_value -> true
2✔
321
        _ -> false
414✔
322
      end
323
    end)
324
    |> case do
32✔
325
      [] ->
30✔
326
        :ok
327

328
      changed_parameters ->
329
        Logger.warning("Changed parameters: #{inspect(changed_parameters)}")
2✔
330

331
        # TODO: should we update all? Previously we only updated server version
332
        changed_parameters = Map.new(changed_parameters)
2✔
333
        Tenants.update_tenant_ps(tenant, %{server_version: changed_parameters["server_version"]})
2✔
334

335
        :ok
336
    end
337
  end
338
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc