• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.72
/lib/supavisor/tenants.ex
1
defmodule Supavisor.Tenants do
2
  @moduledoc """
3
  The Tenants context.
4
  """
5

6
  import Ecto.Query, warn: false
7
  alias Supavisor.Repo
8

9
  alias Supavisor.ClientHandler.Auth.ManagerSecrets
10
  alias Supavisor.Tenants.Cluster
11
  alias Supavisor.Tenants.ClusterTenants
12
  alias Supavisor.Tenants.Tenant
13
  alias Supavisor.Tenants.User
14

15
  require Logger
16

17
  @doc """
18
  Returns the list of tenants.
19

20
  ## Examples
21

22
      iex> list_tenants()
23
      [%Tenant{}, ...]
24

25
  """
26
  def list_tenants do
27
    Repo.all(Tenant) |> Repo.preload([:users])
1✔
28
  end
29

30
  @doc """
31
  Gets a single tenant.
32

33
  Raises `Ecto.NoResultsError` if the Tenant does not exist.
34

35
  ## Examples
36

37
      iex> get_tenant!(123)
38
      %Tenant{}
39

40
      iex> get_tenant!(456)
41
      ** (Ecto.NoResultsError)
42

43
  """
44
  def get_tenant!(id), do: Repo.get!(Tenant, id)
3✔
45

46
  @doc """
47
  Gets a single tenant by external_id.
48

49
  ## Examples
50

51
      iex> get_tenant_by_external_id!("123")
52
      %Tenant{}
53

54
      iex> get_tenant!(456)
55
      nil
56
  """
57
  @spec get_tenant_by_external_id(String.t()) :: Tenant.t() | nil
58
  def get_tenant_by_external_id(external_id) do
59
    Tenant |> Repo.get_by(external_id: external_id) |> Repo.preload(:users)
57✔
60
  end
61

62
  @spec get_cluster_by_alias(String.t()) :: Cluster.t() | nil
63
  def get_cluster_by_alias(alias) do
64
    Cluster |> Repo.get_by(alias: alias) |> Repo.preload(:cluster_tenants)
1✔
65
  end
66

67
  @spec get_tenant_cache(String.t() | nil, String.t() | nil) :: Tenant.t() | nil
68
  def get_tenant_cache(external_id, sni_hostname) do
69
    cache_key = {:tenant_cache, external_id, sni_hostname}
5✔
70

71
    case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
5✔
72
           {:commit, {:cached, get_tenant(external_id, sni_hostname)}, ttl: :timer.hours(24)}
4✔
73
         end) do
74
      {:ok, {:cached, value}} -> value
1✔
75
      {:commit, {:cached, value}, _} -> value
4✔
76
    end
77
  end
78

79
  @spec get_tenant(String.t() | nil, String.t() | nil) :: Tenant.t() | nil
80
  def get_tenant(nil, sni_hostname) when sni_hostname != nil do
81
    Tenant |> Repo.get_by(sni_hostname: sni_hostname)
1✔
82
  end
83

84
  def get_tenant(external_id, _) when external_id != nil do
85
    Tenant |> Repo.get_by(external_id: external_id)
6✔
86
  end
87

88
  def get_tenant(_, _), do: nil
1✔
89

90
  @spec get_user_cache(:single | :cluster, String.t(), String.t() | nil, String.t() | nil) ::
91
          {:ok, map()} | {:error, any()}
92
  def get_user_cache(type, user, external_id, sni_hostname) do
93
    cache_key = {:user_cache, type, user, external_id, sni_hostname}
620✔
94

95
    case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
620✔
96
           {:commit, {:cached, get_user(type, user, external_id, sni_hostname)},
37✔
97
            ttl: :timer.hours(24)}
98
         end) do
99
      {_, {:cached, value}} -> value
583✔
100
      {_, {:cached, value}, _} -> value
37✔
101
    end
102
  end
103

104
  @spec get_user(atom(), String.t(), String.t() | nil, String.t() | nil) ::
105
          {:ok, map()} | {:error, any()}
106
  def get_user(_, _, nil, nil) do
×
107
    {:error, "Either external_id or sni_hostname must be provided"}
108
  end
109

110
  def get_user(:cluster, user, external_id, sni_hostname) do
111
    query =
×
112
      from(ct in ClusterTenants,
113
        where: ct.cluster_alias == ^external_id and ct.active == true,
114
        limit: 1
115
      )
116

117
    case Repo.all(query) do
×
118
      [%ClusterTenants{} = ct] ->
119
        get_user(:single, user, ct.tenant_external_id, sni_hostname)
×
120

121
      [_ | _] ->
×
122
        {:error, :multiple_results}
123

124
      _ ->
×
125
        {:error, :not_found}
126
    end
127
  end
128

129
  def get_user(:single, user, external_id, sni_hostname) do
130
    query = build_user_query(user, external_id, sni_hostname)
39✔
131

132
    case Repo.all(query) do
39✔
133
      [{%User{}, %Tenant{}} = {user, tenant}] ->
36✔
134
        {:ok, %{user: user, tenant: tenant}}
135

136
      [_ | _] ->
×
137
        {:error, :multiple_results}
138

139
      _ ->
3✔
140
        {:error, :not_found}
141
    end
142
  end
143

144
  @spec get_manager_user(String.t()) :: User.t() | nil
145
  def get_manager_user(external_id) do
146
    from(u in User,
147
      where: u.tenant_external_id == ^external_id and u.is_manager == true
148
    )
149
    |> Repo.one()
28✔
150
  end
151

152
  @spec get_manager_user_cache(String.t()) :: ManagerSecrets.t() | nil
153
  def get_manager_user_cache(external_id, ttl \\ :timer.hours(24)) do
154
    ttl = if is_nil(ttl), do: :timer.hours(24), else: ttl
34✔
155
    cache_key = {:manager_user_cache, external_id}
34✔
156

157
    case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
34✔
158
           {:commit, {:cached, get_manager_user(external_id)}, ttl: ttl}
28✔
159
         end) do
160
      {_, {:cached, nil}} ->
3✔
161
        nil
162

163
      {_, {:cached, nil}, _} ->
6✔
164
        nil
165

166
      {_, {:cached, user}} ->
167
        %ManagerSecrets{
3✔
168
          db_user: user.db_user,
3✔
169
          db_password: user.db_password
3✔
170
        }
171

172
      {_, {:cached, user}, _} ->
173
        %ManagerSecrets{
22✔
174
          db_user: user.db_user,
22✔
175
          db_password: user.db_password
22✔
176
        }
177
    end
178
  end
179

180
  def get_pool_config(external_id, user) do
181
    from(t in Tenant,
31✔
182
      left_join: u in User,
183
      on: u.tenant_external_id == t.external_id and u.db_user == ^user,
184
      where: t.external_id == ^external_id,
185
      preload: [users: u]
186
    )
187
    |> Repo.all()
31✔
188
  end
189

190
  def get_pool_config_cache(external_id, user, ttl \\ :timer.hours(24)) do
191
    ttl = if is_nil(ttl), do: :timer.hours(24), else: ttl
71✔
192
    cache_key = {:pool_config_cache, external_id, user}
71✔
193

194
    case Cachex.fetch(Supavisor.Cache, cache_key, fn _key ->
71✔
195
           {:commit, {:cached, get_pool_config(external_id, user)}, ttl: ttl}
31✔
196
         end) do
197
      {_, {:cached, value}} -> value
40✔
198
      {_, {:cached, value}, _} -> value
31✔
199
    end
200
  end
201

202
  @spec get_cluster_config(String.t(), String.t()) :: [ClusterTenants.t()] | {:error, any()}
203
  def get_cluster_config(external_id, user) do
204
    case Repo.all(ClusterTenants, cluster_alias: external_id) do
×
205
      [%{cluster_alias: cluster_alias, active: true} | _] ->
NEW
206
        user = from(u in User, where: u.db_user == ^user)
×
207
        tenant = from(t in Tenant, preload: [users: ^user])
×
208

209
        from(ct in ClusterTenants,
210
          where: ct.cluster_alias == ^cluster_alias and ct.active == true,
211
          preload: [tenant: ^tenant]
212
        )
213
        |> Repo.all()
214
        |> Enum.reduce_while({nil, []}, &process_cluster/2)
×
215

216
      _ ->
×
217
        {:error, :not_found}
218
    end
219
  end
220

221
  defp process_cluster(cluster, {type, acc}) do
222
    type = if is_nil(type), do: cluster.tenant.require_user, else: type
×
223

224
    case cluster.tenant.users do
×
225
      [_user] when type == cluster.tenant.require_user -> {:cont, {type, [cluster | acc]}}
×
226
      [_user] -> {:halt, {:error, {:config, :different_users, cluster.tenant.external_id}}}
×
227
      _ -> {:halt, {:error, {:config, :multiple_users, cluster.tenant.external_id}}}
×
228
    end
229
  end
230

231
  @doc """
232
  Creates a tenant.
233

234
  ## Examples
235

236
      iex> create_tenant(%{field: value})
237
      {:ok, %Tenant{}}
238

239
      iex> create_tenant(%{field: bad_value})
240
      {:error, %Ecto.Changeset{}}
241

242
  """
243
  def create_tenant(attrs \\ %{}) do
244
    %Tenant{}
245
    |> Tenant.changeset(attrs)
246
    |> Repo.insert()
62✔
247
  end
248

249
  @doc """
250
  Updates a tenant.
251

252
  ## Examples
253

254
      iex> update_tenant(tenant, %{field: new_value})
255
      {:ok, %Tenant{}}
256

257
      iex> update_tenant(tenant, %{field: bad_value})
258
      {:error, %Ecto.Changeset{}}
259

260
  """
261
  def update_tenant(%Tenant{} = tenant, attrs) do
262
    tenant
263
    |> Tenant.changeset(attrs)
264
    |> Repo.update()
5✔
265
  end
266

267
  def update_tenant_ps(external_id, new_ps) do
268
    from(t in Tenant, where: t.external_id == ^external_id)
269
    |> Repo.one()
270
    |> Tenant.changeset(%{default_parameter_status: new_ps})
271
    |> Repo.update()
3✔
272
  end
273

274
  @doc """
275
  Deletes a tenant.
276

277
  ## Examples
278

279
      iex> delete_tenant(tenant)
280
      {:ok, %Tenant{}}
281

282
      iex> delete_tenant(tenant)
283
      {:error, %Ecto.Changeset{}}
284

285
  """
286
  def delete_tenant(%Tenant{} = tenant) do
287
    Repo.delete(tenant)
1✔
288
  end
289

290
  @spec delete_tenant_by_external_id(String.t()) :: boolean()
291
  def delete_tenant_by_external_id(id) do
292
    from(t in Tenant, where: t.external_id == ^id)
293
    |> Repo.delete_all()
294
    |> case do
6✔
295
      {num, _} when num > 0 ->
5✔
296
        true
297

298
      _ ->
1✔
299
        false
300
    end
301
  end
302

303
  @spec delete_cluster_by_alias(String.t()) :: boolean()
304
  def delete_cluster_by_alias(id) do
305
    from(t in Cluster, where: t.alias == ^id)
306
    |> Repo.delete_all()
307
    |> case do
2✔
308
      {num, _} when num > 0 ->
1✔
309
        true
310

311
      _ ->
1✔
312
        false
313
    end
314
  end
315

316
  @doc """
317
  Returns an `%Ecto.Changeset{}` for tracking tenant changes.
318

319
  ## Examples
320

321
      iex> change_tenant(tenant)
322
      %Ecto.Changeset{data: %Tenant{}}
323

324
  """
325
  def change_tenant(%Tenant{} = tenant, attrs \\ %{}) do
326
    Tenant.changeset(tenant, attrs)
1✔
327
  end
328

329
  alias Supavisor.Tenants.User
330

331
  @doc """
332
  Returns the list of users.
333

334
  ## Examples
335

336
      iex> list_users()
337
      [%User{}, ...]
338

339
  """
340
  def list_users do
341
    Repo.all(User)
×
342
  end
343

344
  @doc """
345
  Creates a user.
346

347
  ## Examples
348

349
      iex> create_user(%{field: value})
350
      {:ok, %User{}}
351

352
      iex> create_user(%{field: bad_value})
353
      {:error, %Ecto.Changeset{}}
354

355
  """
356
  def create_user(attrs \\ %{}) do
357
    %User{}
358
    |> User.changeset(attrs)
359
    |> Repo.insert()
×
360
  end
361

362
  @doc """
363
  Updates a user.
364

365
  ## Examples
366

367
      iex> update_user(user, %{field: new_value})
368
      {:ok, %User{}}
369

370
      iex> update_user(user, %{field: bad_value})
371
      {:error, %Ecto.Changeset{}}
372

373
  """
374
  def update_user(%User{} = user, attrs) do
375
    user
376
    |> User.changeset(attrs)
377
    |> Repo.update()
×
378
  end
379

380
  @doc """
381
  Updates the manager user credentials for a tenant.
382

383
  This function also updates the secret checker credentials globally and clears the cache.
384

385
  ## Examples
386

387
      iex> update_manager_user_credentials(tenant, %{"db_user" => "new_user", "db_password" => "new_pass"})
388
      {:ok, %User{}}
389

390
      iex> update_manager_user_credentials(tenant, %{})
391
      {:error, :no_manager_user}
392

393
  """
394
  def update_manager_user_credentials(%Tenant{} = tenant, attrs) do
395
    tenant = Repo.preload(tenant, :users)
9✔
396

397
    case Enum.find(tenant.users, & &1.is_manager) do
9✔
398
      nil ->
×
399
        {:error, :no_manager_user}
400

401
      manager_user ->
402
        case manager_user
9✔
403
             |> User.credentials_changeset(attrs)
404
             |> Repo.update() do
405
          {:ok, updated_user} ->
406
            Logger.info(
6✔
407
              "Updating auth credentials for tenant #{tenant.external_id}, user #{updated_user.db_user}"
6✔
408
            )
409

410
            password_fn = fn -> updated_user.db_password end
6✔
411

412
            checker_result =
6✔
413
              Supavisor.update_secret_checker_credentials_global(
414
                tenant.external_id,
6✔
415
                updated_user.db_user,
6✔
416
                password_fn
417
              )
418

419
            Logger.info(
6✔
420
              "Update SecretChecker credentials #{tenant.external_id}: #{inspect(checker_result)}"
6✔
421
            )
422

423
            # Clear cache after SecretCheckers are updated to avoid race condition
424
            cleanup_result = Supavisor.del_all_cache_dist(tenant.external_id)
6✔
425
            Logger.info("Delete cache dist #{tenant.external_id}: #{inspect(cleanup_result)}")
6✔
426

427
            :ok
428

429
          {:error, changeset} ->
3✔
430
            {:error, changeset}
431
        end
432
    end
433
  end
434

435
  @doc """
436
  Deletes a user.
437

438
  ## Examples
439

440
      iex> delete_user(user)
441
      {:ok, %User{}}
442

443
      iex> delete_user(user)
444
      {:error, %Ecto.Changeset{}}
445

446
  """
447
  def delete_user(%User{} = user) do
448
    Repo.delete(user)
×
449
  end
450

451
  @doc """
452
  Returns an `%Ecto.Changeset{}` for tracking user changes.
453

454
  ## Examples
455

456
      iex> change_user(user)
457
      %Ecto.Changeset{data: %User{}}
458

459
  """
460
  def change_user(%User{} = user, attrs \\ %{}) do
461
    User.changeset(user, attrs)
×
462
  end
463

464
  @spec build_user_query(String.t(), String.t() | nil, String.t() | nil) ::
465
          Ecto.Queryable.t()
466
  defp build_user_query(user, external_id, sni_hostname) do
467
    from(u in User,
39✔
468
      join: t in Tenant,
469
      on: u.tenant_external_id == t.external_id,
470
      where:
471
        u.db_user == ^user or
472
          (t.require_user == false and u.is_manager == true),
473
      select: {u, t}
474
    )
475
    |> where(^with_tenant(external_id, sni_hostname))
39✔
476
  end
477

478
  defp with_tenant(nil, sni_hostname) do
479
    dynamic([_, t], t.sni_hostname == ^sni_hostname)
×
480
  end
481

482
  defp with_tenant(external_id, _) do
483
    dynamic([_, t], t.external_id == ^external_id)
39✔
484
  end
485

486
  alias Supavisor.Tenants.Cluster
487

488
  @doc """
489
  Returns the list of clusters.
490

491
  ## Examples
492

493
      iex> list_clusters()
494
      [%Cluster{}, ...]
495

496
  """
497
  def list_clusters do
498
    Repo.all(Cluster)
1✔
499
  end
500

501
  @doc """
502
  Gets a single cluster.
503

504
  Raises `Ecto.NoResultsError` if the Cluster does not exist.
505

506
  ## Examples
507

508
      iex> get_cluster!(123)
509
      %Cluster{}
510

511
      iex> get_cluster!(456)
512
      ** (Ecto.NoResultsError)
513

514
  """
515
  def get_cluster!(id), do: Repo.get!(Cluster, id)
3✔
516

517
  @spec get_cluster_with_rel(String.t()) :: {:ok, Cluster.t()} | {:error, any()}
518
  def get_cluster_with_rel(id) do
519
    case Repo.get(Cluster, id) do
×
520
      nil ->
×
521
        {:error, :not_found}
522

523
      cluster ->
×
524
        {:ok, Repo.preload(cluster, :cluster_tenants)}
525
    end
526
  end
527

528
  @doc """
529
  Creates a cluster.
530

531
  ## Examples
532

533
      iex> create_cluster(%{field: value})
534
      {:ok, %Cluster{}}
535

536
      iex> create_cluster(%{field: bad_value})
537
      {:error, %Ecto.Changeset{}}
538

539
  """
540
  def create_cluster(attrs \\ %{}) do
541
    %Cluster{}
542
    |> Cluster.changeset(attrs)
543
    |> Repo.insert()
10✔
544
  end
545

546
  @doc """
547
  Updates a cluster.
548

549
  ## Examples
550

551
      iex> update_cluster(cluster, %{field: new_value})
552
      {:ok, %Cluster{}}
553

554
      iex> update_cluster(cluster, %{field: bad_value})
555
      {:error, %Ecto.Changeset{}}
556

557
  """
558
  def update_cluster(%Cluster{} = cluster, attrs) do
559
    cluster
560
    |> Cluster.changeset(attrs)
561
    |> Repo.update()
2✔
562
  end
563

564
  @doc """
565
  Deletes a cluster.
566

567
  ## Examples
568

569
      iex> delete_cluster(cluster)
570
      {:ok, %Cluster{}}
571

572
      iex> delete_cluster(cluster)
573
      {:error, %Ecto.Changeset{}}
574

575
  """
576
  def delete_cluster(%Cluster{} = cluster) do
577
    Repo.delete(cluster)
1✔
578
  end
579

580
  @doc """
581
  Returns an `%Ecto.Changeset{}` for tracking cluster changes.
582

583
  ## Examples
584

585
      iex> change_cluster(cluster)
586
      %Ecto.Changeset{data: %Cluster{}}
587

588
  """
589
  def change_cluster(%Cluster{} = cluster, attrs \\ %{}) do
590
    Cluster.changeset(cluster, attrs)
1✔
591
  end
592

593
  alias Supavisor.Tenants.ClusterTenants
594

595
  @doc """
596
  Returns the list of cluster_tenants.
597

598
  ## Examples
599

600
      iex> list_cluster_tenants()
601
      [%ClusterTenants{}, ...]
602

603
  """
604
  def list_cluster_tenants do
605
    Repo.all(ClusterTenants)
×
606
  end
607

608
  @doc """
609
  Gets a single cluster_tenants.
610

611
  Raises `Ecto.NoResultsError` if the Cluster tenants does not exist.
612

613
  ## Examples
614

615
      iex> get_cluster_tenants!(123)
616
      %ClusterTenants{}
617

618
      iex> get_cluster_tenants!(456)
619
      ** (Ecto.NoResultsError)
620

621
  """
622
  def get_cluster_tenants!(id), do: Repo.get!(ClusterTenants, id)
×
623

624
  @doc """
625
  Creates a cluster_tenants.
626

627
  ## Examples
628

629
      iex> create_cluster_tenants(%{field: value})
630
      {:ok, %ClusterTenants{}}
631

632
      iex> create_cluster_tenants(%{field: bad_value})
633
      {:error, %Ecto.Changeset{}}
634

635
  """
636
  def create_cluster_tenants(attrs \\ %{}) do
637
    %ClusterTenants{}
638
    |> ClusterTenants.changeset(attrs)
639
    |> Repo.insert()
×
640
  end
641

642
  @doc """
643
  Updates a cluster_tenants.
644

645
  ## Examples
646

647
      iex> update_cluster_tenants(cluster_tenants, %{field: new_value})
648
      {:ok, %ClusterTenants{}}
649

650
      iex> update_cluster_tenants(cluster_tenants, %{field: bad_value})
651
      {:error, %Ecto.Changeset{}}
652

653
  """
654
  def update_cluster_tenants(%ClusterTenants{} = cluster_tenants, attrs) do
655
    cluster_tenants
656
    |> ClusterTenants.changeset(attrs)
657
    |> Repo.update()
×
658
  end
659

660
  @doc """
661
  Deletes a cluster_tenants.
662

663
  ## Examples
664

665
      iex> delete_cluster_tenants(cluster_tenants)
666
      {:ok, %ClusterTenants{}}
667

668
      iex> delete_cluster_tenants(cluster_tenants)
669
      {:error, %Ecto.Changeset{}}
670

671
  """
672
  def delete_cluster_tenants(%ClusterTenants{} = cluster_tenants) do
673
    Repo.delete(cluster_tenants)
×
674
  end
675

676
  @doc """
677
  Returns an `%Ecto.Changeset{}` for tracking cluster_tenants changes.
678

679
  ## Examples
680

681
      iex> change_cluster_tenants(cluster_tenants)
682
      %Ecto.Changeset{data: %ClusterTenants{}}
683

684
  """
685
  def change_cluster_tenants(%ClusterTenants{} = cluster_tenants, attrs \\ %{}) do
686
    ClusterTenants.changeset(cluster_tenants, attrs)
×
687
  end
688
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc