• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/lib/supavisor/circuit_breaker.ex
1
defmodule Supavisor.CircuitBreaker do
2
  @moduledoc """
3
  Simple ETS-based circuit breaker for pool operations.
4

5
  Tracks failures per tenant and blocks operations when thresholds are exceeded.
6
  """
7

8
  require Logger
9

10
  @table __MODULE__
11

12
  @thresholds %{
13
    get_secrets: %{
14
      max_failures: 5,
15
      window_seconds: 600,
16
      block_seconds: 600,
17
      explanation: "Failed to retrieve database credentials"
18
    },
19
    db_connection: %{
20
      max_failures: 100,
21
      window_seconds: 300,
22
      block_seconds: 600,
23
      explanation: "Unable to establish connection to upstream database"
24
    }
25
  }
26

27
  @doc """
28
  Initializes the circuit breaker ETS table.
29
  Called by the application supervisor.
30
  """
31
  def init do
NEW
32
    :ets.new(@table, [:named_table, :public, :set, read_concurrency: true])
×
33
  end
34

35
  @doc """
36
  Records a failure for a given tenant and operation.
37
  """
38
  @spec record_failure(String.t(), atom()) :: :ok
39
  def record_failure(tenant, operation) when is_binary(tenant) and is_atom(operation) do
40
    now = System.system_time(:second)
238✔
41
    key = {tenant, operation}
238✔
42

43
    case :ets.lookup(@table, key) do
238✔
44
      [] ->
45
        :ets.insert(@table, {key, %{failures: [now], blocked_until: nil}})
14✔
46

47
      [{^key, state}] ->
48
        threshold = Map.fetch!(@thresholds, operation)
224✔
49
        window_start = now - threshold.window_seconds
224✔
50

51
        recent_failures = Enum.filter([now | state.failures], &(&1 >= window_start))
224✔
52

53
        blocked_until =
224✔
54
          if length(recent_failures) >= threshold.max_failures do
224✔
55
            block_until = now + threshold.block_seconds
8✔
56

57
            Logger.warning(
8✔
58
              "Circuit breaker opened for tenant=#{tenant} operation=#{operation} until=#{block_until}"
8✔
59
            )
60

61
            block_until
8✔
62
          else
63
            state.blocked_until
216✔
64
          end
65

66
        :ets.insert(@table, {key, %{failures: recent_failures, blocked_until: blocked_until}})
224✔
67
    end
68

69
    :ok
70
  end
71

72
  @doc """
73
  Checks if a circuit breaker is open for a given tenant and operation.
74
  Returns :ok if operation is allowed, {:error, :circuit_open, blocked_until} otherwise.
75
  """
76
  @spec check(String.t(), atom()) :: :ok | {:error, :circuit_open, integer()}
77
  def check(tenant, operation) when is_binary(tenant) and is_atom(operation) do
78
    now = System.system_time(:second)
1,233✔
79
    key = {tenant, operation}
1,233✔
80

81
    case :ets.lookup(@table, key) do
1,233✔
82
      [] ->
1,220✔
83
        :ok
84

85
      [{^key, %{blocked_until: nil}}] ->
5✔
86
        :ok
87

88
      [{^key, %{blocked_until: blocked_until}}] when blocked_until > now ->
89
        {:error, :circuit_open, blocked_until}
7✔
90

91
      [{^key, state}] ->
92
        :ets.insert(@table, {key, %{state | blocked_until: nil}})
1✔
93
        :ok
94
    end
95
  end
96

97
  @doc """
98
  Clears circuit breaker state for a tenant and operation.
99
  """
100
  @spec clear(String.t(), atom()) :: :ok
101
  def clear(tenant, operation) when is_binary(tenant) and is_atom(operation) do
102
    :ets.delete(@table, {tenant, operation})
3✔
103
    :ok
104
  end
105

106
  @doc """
107
  Returns the user-facing explanation for a given operation.
108
  """
109
  @spec explanation(atom()) :: String.t()
110
  def explanation(operation) when is_atom(operation) do
111
    @thresholds
112
    |> Map.fetch!(operation)
113
    |> Map.fetch!(:explanation)
2✔
114
  end
115

116
  @doc """
117
  Removes stale entries from the circuit breaker table.
118
  Called periodically by the Janitor process.
119
  """
120
  @spec cleanup_stale_entries() :: non_neg_integer()
121
  def cleanup_stale_entries do
122
    now = System.system_time(:second)
4✔
123
    max_window = @thresholds |> Map.values() |> Enum.map(& &1.window_seconds) |> Enum.max()
4✔
124
    cutoff = now - max_window * 2
4✔
125

126
    match_spec = [
4✔
127
      {{{:"$1", :"$2"}, %{failures: :"$3", blocked_until: :"$4"}}, [],
128
       [{{:"$1", :"$2", :"$3", :"$4"}}]}
129
    ]
130

131
    entries = :ets.select(@table, match_spec)
4✔
132

133
    deleted =
4✔
134
      Enum.reduce(entries, 0, fn {tenant, operation, failures, blocked_until}, acc ->
135
        latest_failure = List.first(failures) || 0
4✔
136
        expired_block = blocked_until && blocked_until < now
4✔
137

138
        if latest_failure < cutoff and (is_nil(blocked_until) or expired_block) do
4✔
139
          :ets.delete(@table, {tenant, operation})
2✔
140
          acc + 1
2✔
141
        else
142
          acc
2✔
143
        end
144
      end)
145

146
    if deleted > 0 do
4✔
147
      Logger.debug("Circuit breaker cleaned up #{deleted} stale entries")
2✔
148
    end
149

150
    deleted
4✔
151
  end
152
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc