• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coryodaniel / bonny / 6242b303d99f7b9b2648e21dcc9425036f44d7a6-PR-309

24 Feb 2025 07:21AM UTC coverage: 83.282% (+4.9%) from 78.374%
6242b303d99f7b9b2648e21dcc9425036f44d7a6-PR-309

Pull #309

github

web-flow
Bump ex_doc from 0.37.0 to 0.37.2

Bumps [ex_doc](https://github.com/elixir-lang/ex_doc) from 0.37.0 to 0.37.2.
- [Release notes](https://github.com/elixir-lang/ex_doc/releases)
- [Changelog](https://github.com/elixir-lang/ex_doc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/elixir-lang/ex_doc/compare/v0.37.0...v0.37.2)

---
updated-dependencies:
- dependency-name: ex_doc
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #309: Bump ex_doc from 0.37.0 to 0.37.2

543 of 652 relevant lines covered (83.28%)

13.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.73
/lib/bonny/operator/leader_elector.ex
1
defmodule Bonny.Operator.LeaderElector do
2
  @moduledoc """
3
  The leader uses a [Kubernetes
4
  Lease](https://kubernetes.io/docs/concepts/architecture/leases/) to make sure
5
  the operator only runs on one single replica (the leader) at the same time.
6

7
  ## Enabling the Leader Election
8

9
  > #### Functionality still in Beta {: .warning}
10
  >
11
  > The leader election is still being tested. Enable it for testing purposes
12
  > only and please report any issues on Github.
13

14
  To enable leader election you have to pass the `enable_leader_election: true` option when [adding the operator to your Supervisor](#adding-the-operator-to-your-supervisor):
15

16
  ```elixir
17
  defmodule MyOperator.Application do
18
    use Application
19

20
    def start(_type, env: env) do
21
      children = [
22
        {MyOperator.Operator,
23
        conn: MyOperator.K8sConn.get!(env),
24
        watch_namespace: :all,
25
        enable_leader_election: true} # <-- starts the leader elector
26
      ]
27

28
      opts = [strategy: :one_for_one, name: MyOperator.Supervisor]
29
      Supervisor.start_link(children, opts)
30
    end
31
  end
32
  ```
33
  """
34

35
  use GenServer
36

37
  import YamlElixir.Sigil
38

39
  require Logger
40

41
  # lease_duration is the duration that non-leader candidates will
42
  # wait to force acquire leadership. This is measured against time of
43
  # last observed ack.
44
  @lease_duration 15
45

46
  # renew_deadline is the duration that the acting master will retry
47
  # refreshing leadership before giving up.
48
  @renew_deadline 10
49

50
  # retry_period is the duration the LeaderElector clients should wait
51
  # between tries of actions.
52
  @retry_period 2
53

54
  defstruct [:controllers, :operator, :init_args, :conn, operator_pid: nil]
55

56
  @spec start_link(controllers :: list(), operator :: atom(), init_args :: Keyword.t()) ::
57
          {:ok, pid}
58
  def start_link(controllers, operator, init_args) do
59
    {:ok, pid} = GenServer.start_link(__MODULE__, {controllers, operator, init_args})
2✔
60
    send(pid, :maybe_acquire_leadership)
2✔
61
    {:ok, pid}
62
  end
63

64
  @impl true
65
  def init({controllers, operator, init_args}) do
66
    conn = Keyword.fetch!(init_args, :conn)
2✔
67

68
    {:ok,
69
     struct!(__MODULE__,
70
       controllers: controllers,
71
       operator: operator,
72
       conn: conn,
73
       init_args: init_args
74
     )}
75
  end
76

77
  @impl true
78
  def handle_info(:maybe_acquire_leadership, state) do
79
    am_i_leader? = not is_nil(state.operator_pid)
2✔
80

81
    Logger.debug("{Operator=#{inspect(state.operator)}} - Starting leadership evaluation",
82
      library: :bonny
83
    )
84

85
    state =
2✔
86
      case acquire_or_renew(state.conn, state.operator) do
2✔
87
        :ok when am_i_leader? ->
88
          Logger.debug(
89
            "{Operator=#{inspect(state.operator)}} - I am the leader - I stay the leader.",
×
90
            library: :bonny
91
          )
92

93
          state
×
94

95
        :ok ->
96
          Logger.debug(
97
            "{Operator=#{inspect(state.operator)}} - I am the new leader. Starting the operator.",
×
98
            library: :bonny
99
          )
100

101
          {:ok, pid} =
2✔
102
            Bonny.Operator.Supervisor.start_link(
103
              state.controllers,
2✔
104
              state.operator,
2✔
105
              state.init_args
2✔
106
            )
107

108
          ref = Process.monitor(pid)
2✔
109
          struct!(state, operator_pid: {pid, ref})
2✔
110

111
        _other when am_i_leader? ->
112
          Logger.debug(
113
            "{Operator=#{inspect(state.operator)}} - I was the leader but somebody else took over leadership. Terminating operator.",
×
114
            library: :bonny
115
          )
116

117
          {pid, _ref} = state.operator_pid
×
118
          Process.exit(pid, :shutdown)
×
119
          struct!(state, operator_pid: nil)
×
120

121
        _other ->
122
          Logger.debug("{Operator=#{inspect(state.operator)}} - Somebody else is the leader.",
123
            library: :bonny
124
          )
125

126
          state
×
127
      end
128

129
    timeout = if is_nil(state.operator_pid), do: @retry_period, else: @renew_deadline
2✔
130
    Process.send_after(self(), :maybe_acquire_leadership, timeout * 1000)
2✔
131
    {:noreply, state}
132
  end
133

134
  def handle_info(
135
        {:DOWN, ref, :process, pid, _reason},
136
        %__MODULE__{operator_pid: {pid, ref}} = state
137
      ) do
138
    Logger.warning(
139
      "{Operator=#{inspect(state.operator)}} - Uh-oh! Our operator just went down. Guess that means I have to give up leadership. Boohoo!",
×
140
      library: :bonny
141
    )
142

143
    release(state.conn, state.operator)
×
144
    struct!(state, operator_pid: nil)
×
145
  end
146

147
  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
148
    Logger.warning(
149
      "{Operator=#{inspect(state.operator)}} - Very strange. A process I'm monitoring went down. But I'm not the leader. Looks like a bug in Bonny. Anyway, releaseing the lock if I have it.",
×
150
      library: :bonny
151
    )
152

153
    release(state.conn, state.operator)
×
154
    struct!(state, operator_pid: nil)
×
155
  end
156

157
  @impl true
158
  def terminate(_, %__MODULE__{operator_pid: {pid, _ref}} = state) do
159
    Logger.debug(
160
      "{Operator=#{inspect(state.operator)}} - I'm going down - releasing the lock now.",
×
161
      library: :bonny
162
    )
163

164
    release(state.conn, state.operator)
×
165
    Process.exit(pid, :shutdown)
×
166
    struct!(state, operator_pid: nil)
×
167
  end
168

169
  def terminate(_, state) do
170
    Logger.debug(
171
      "{Operator=#{inspect(state.operator)}} - I'm going down but I'm not the leader so chill!"
×
172
    )
173

174
    state
×
175
  end
176

177
  defp release(conn, operator) do
178
    my_name = Bonny.Config.instance_name()
×
179

180
    case get_lease(conn, operator) do
×
181
      {:error, _} ->
×
182
        :ok
183

184
      {:ok, %{"spec" => %{"holderIdentity" => ^my_name}} = old_lease} ->
185
        old_lease
186
        |> put_in(~w(spec leaseDurationSeconds), 1)
187
        |> Bonny.Resource.apply(conn, [])
×
188

189
        :ok
190

191
      _ ->
×
192
        :ok
193
    end
194
  end
195

196
  defp acquire_or_renew(conn, operator) do
197
    now = DateTime.utc_now()
2✔
198
    my_lease = lease(now, @lease_duration, operator)
2✔
199

200
    case get_lease(conn, operator) do
2✔
201
      {:error, %K8s.Client.APIError{reason: "NotFound"}} ->
202
        Logger.debug("{Operator=#{inspect(operator)}} - Lease not found. Trying to create it.",
203
          library: :bonny
204
        )
205

206
        result =
1✔
207
          K8s.Client.create(my_lease)
208
          |> K8s.Client.put_conn(conn)
209
          |> K8s.Client.run()
210

211
        case result do
1✔
212
          {:ok, _} ->
213
            Logger.debug("{Operator=#{inspect(operator)}} - Lease successfully created.",
214
              library: :bonny
215
            )
216

217
            :ok
218

219
          {:error, %K8s.Client.APIError{reason: "AlreadyExists"}} ->
220
            Logger.debug(
221
              "{Operator=#{inspect(operator)}} - Failed creating lease. Seems to have been created by somebody else in the meantime.",
222
              library: :bonny
223
            )
224

225
            :locked
226
        end
227

228
      {:ok, old_lease} ->
229
        if locked_by_sbdy_else?(now, old_lease, my_lease) do
1✔
230
          Logger.debug(
231
            ~s({Operator=#{inspect(operator)}} - Lock is held by "#{old_lease["spec"]["holderIdentity"]}" and has not yet expired.),
×
232
            library: :bonny
233
          )
234

235
          :locked
236
        else
237
          my_lease =
1✔
238
            if old_lease["spec"]["holderIdentity"] == my_lease["spec"]["holderIdentity"] do
239
              Logger.debug(
240
                "{Operator=#{inspect(operator)}} - I'm holding the lock. Trying to renew it",
241
                library: :bonny
242
              )
243

244
              my_lease
245
              |> put_in(~w(spec acquireTime), old_lease["spec"]["acquireTime"])
246
              |> put_in(~w(metadata resourceVersion), old_lease["metadata"]["resourceVersion"])
1✔
247
            else
248
              Logger.debug(
249
                ~s({Operator=#{inspect(operator)}} - Lock is held by "#{old_lease["spec"]["holderIdentity"]}" but has expired. Trying to acquire it.),
×
250
                library: :bonny
251
              )
252

253
              my_lease
254
              |> put_in(~w(metadata resourceVersion), old_lease["metadata"]["resourceVersion"])
×
255
            end
256

257
          # credo:disable-for-next-line
258
          case Bonny.Resource.apply(my_lease, conn, []) do
1✔
259
            {:ok, _} ->
260
              Logger.debug(
261
                ~s({Operator=#{inspect(operator)}} - Lock successfully acquired/renewed.),
262
                library: :bonny
263
              )
264

265
              :ok
266

267
            {:error, exception} when is_exception(exception) ->
268
              Logger.debug(
269
                ~s({Operator=#{inspect(operator)}} - Failed aquiring/renewing the lock. #{Exception.message(exception)}),
×
270
                library: :bonny
271
              )
272

273
              :error
274
          end
275
        end
276
    end
277
  end
278

279
  defp locked_by_sbdy_else?(now, %{"spec" => old_lease_spec}, my_lease) do
280
    {:ok, last_renew, 0} = DateTime.from_iso8601(old_lease_spec["renewTime"])
1✔
281
    time_of_expiration = DateTime.add(last_renew, old_lease_spec["leaseDurationSeconds"])
1✔
282

283
    String.length(old_lease_spec["holderIdentity"]) > 0 and
×
284
      old_lease_spec["holderIdentity"] != my_lease["spec"]["holderIdentity"] and
1✔
285
      DateTime.compare(time_of_expiration, now) == :gt
×
286
  end
287

288
  defp lease_name(operator) do
289
    operator_hash =
4✔
290
      :crypto.hash(:sha, Atom.to_string(operator))
291
      |> String.slice(0..15)
292
      |> Base.encode16()
293
      |> String.downcase()
294

295
    "#{Bonny.Config.namespace()}-#{Bonny.Config.name()}-#{operator_hash}"
4✔
296
  end
297

298
  defp get_lease(conn, operator) do
299
    K8s.Client.get("coordination.k8s.io/v1", "Lease",
300
      name: lease_name(operator),
301
      namespace: Bonny.Config.namespace()
302
    )
303
    |> K8s.Client.put_conn(conn)
304
    |> K8s.Client.run()
2✔
305
  end
306

307
  defp lease(now, lease_duration, operator) do
308
    ~y"""
2✔
309
    apiVersion: coordination.k8s.io/v1
310
    kind: Lease
311
    metadata:
312
      name: #{lease_name(operator)}
2✔
313
      namespace: #{Bonny.Config.namespace()}
2✔
314
    spec:
315
      holderIdentity: #{Bonny.Config.instance_name()}
2✔
316
      leaseDurationSeconds: #{lease_duration}
2✔
317
      renewTime: #{DateTime.to_iso8601(now)}
2✔
318
      acquireTime: #{DateTime.to_iso8601(now)}
2✔
319
    """
320
  end
321
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc