• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coryodaniel / k8s / 3ad638d4ead66a6b92e146babfaa4886000cf7e7

18 Mar 2024 07:31AM UTC coverage: 84.249%. Remained the same
3ad638d4ead66a6b92e146babfaa4886000cf7e7

push

github

web-flow
Merge pull request #309 from coryodaniel/dependabot/github_actions/actions/checkout-4.1.2

Bump actions/checkout from 4.1.1 to 4.1.2

920 of 1092 relevant lines covered (84.25%)

335.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.56
/lib/k8s/conn/auth/exec_worker.ex
1
defmodule K8s.Conn.Auth.ExecWorker do
2
  @moduledoc false
3
  use GenServer
4

5
  alias K8s.Conn.Error
6

7
  defmodule State do
8
    @moduledoc """
9
    The state of the exec worker
10
    """
11

12
    defstruct [
13
      :command,
14
      :env,
15
      args: [],
16
      token: nil,
17
      expiration_timestamp: nil,
18
      refresh_interval: 60_000,
19
      timer: nil
20
    ]
21

22
    @typedoc """
23
    Simplified version of [ExecConfig](https://kubernetes.io/docs/reference/config-api/kubeconfig.v1/#ExecConfig)
24
    """
25
    @type t :: %__MODULE__{
26
            command: String.t(),
27
            env: %{name: String.t()},
28
            args: list(String.t()),
29
            token: binary() | nil,
30
            expiration_timestamp: DateTime.t() | nil,
31
            refresh_interval: non_neg_integer(),
32
            timer: reference() | nil
33
          }
34
  end
35

36
  @state_opts ~w(command env args token expiration_timestamp refresh_interval)a
37

38
  @spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()}
39
  def start_link(opts) do
40
    {worker_opts, server_opts} = Keyword.split(opts, @state_opts)
16✔
41

42
    GenServer.start_link(__MODULE__, worker_opts, server_opts)
16✔
43
  end
44

45
  @spec via_tuple(any()) ::
46
          {:via, Registry, {K8s.Conn.Auth.Registry, {K8s.Conn.Auth.ExecWorker, any()}}}
47
  def via_tuple(id) do
48
    {:via, Registry, {K8s.Conn.Auth.Registry, {__MODULE__, id}}}
4✔
49
  end
50

51
  @doc """
52
  Given the kube context, parse the options
53
  and return a keyword list for the exec worker.
54
  """
55
  @spec parse_opts(any()) :: Keyword.t()
56
  def parse_opts(%{"exec" => %{"command" => command} = config}) do
57
    # Optional:
58
    args = List.wrap(config["args"])
10✔
59
    env = config["env"] |> List.wrap() |> format_env()
10✔
60

61
    [
62
      command: command,
63
      env: env,
64
      args: args
65
    ]
66
  end
67

68
  def parse_opts(_), do: []
×
69

70
  @spec init(Keyword.t()) :: {:ok, State.t()}
71
  @impl true
72
  def init(args) do
73
    state = struct!(State, args)
16✔
74
    # Start a refresh timer
75
    {:ok, schedule_refresh(state)}
76
  end
77

78
  @spec handle_call(atom(), any(), State.t()) ::
79
          {:reply, {:ok, binary()}, State.t()}
80
          | {:reply, {:error, Error.t()}, State.t()}
81
  @impl true
82
  def handle_call(:get_token, _from, %State{token: token} = state) when not is_nil(token) do
83
    if valid_token?(state) do
×
84
      {:reply, {:ok, token}, state}
×
85
    else
86
      generate_new_token(state)
×
87
    end
88
  end
89

90
  def handle_call(:get_token, _from, state) do
91
    # Generate a new token if the cached token is expired or not present
92
    generate_new_token(state)
12✔
93
  end
94

95
  @spec get_token(GenServer.server()) ::
96
          {:ok, binary()} | {:error, Jason.DecodeError.t() | Error.t()}
97
  def get_token(target) do
98
    GenServer.call(target, :get_token)
12✔
99
  end
100

101
  @impl true
102
  @spec handle_info(any(), State.t()) :: {:noreply, State.t()}
103
  def handle_info(:refresh_token, state) do
104
    case response_from_command(state) do
×
105
      {:ok, response} ->
106
        new_state = state |> update_state_with_response(response) |> schedule_refresh(response)
×
107
        {:noreply, new_state}
108

109
      {:error, _error} ->
×
110
        {:noreply, schedule_refresh(state)}
111
    end
112
  end
113

114
  @spec generate_new_token(State.t()) ::
115
          {:reply, {:ok, binary()}, State.t()} | {:reply, {:error, Error.t()}, State.t()}
116
  defp generate_new_token(%State{} = state) do
117
    case response_from_command(state) do
12✔
118
      {:ok, %{token: new_token} = response} ->
119
        new_state = state |> update_state_with_response(response) |> schedule_refresh(response)
8✔
120

121
        if valid_token?(new_state) do
8✔
122
          {:reply, {:ok, new_token}, new_state}
6✔
123
        else
124
          {:reply, {:error, %Error{message: "Expired Before Processing"}}, new_state}
2✔
125
        end
126

127
      {:error, error} ->
128
        # Keep the process running
129
        {:reply, error, %State{state | token: nil}}
4✔
130
    end
131
  end
132

133
  @spec response_from_command(State.t()) ::
134
          {:ok, %{token: String.t() | nil, expiration_timestamp: DateTime.t() | nil}}
135
          | {:error, Jason.DecodeError.t() | Error.t()}
136
  defp response_from_command(%{command: command, args: args, env: env} = _config) do
137
    # Execute the command by exec'ing the command with the args and env
138
    with {cmd_response, 0} <- System.cmd(command, args, env: env),
12✔
139
         # parse the binary response from the command
140
         {:ok, data} <- Jason.decode(cmd_response),
12✔
141
         # If there's a token in the response, return it
142
         {:ok, %{token: token} = response} when not is_nil(token) <- parse_cmd_response(data) do
10✔
143
      {:ok, response}
144
    else
145
      # If the command fails, return an error
146
      {cmd_response, err_code} when is_binary(cmd_response) and is_integer(err_code) ->
147
        msg = "#{__MODULE__} failed: #{cmd_response}"
×
148
        {:error, %Error{message: msg}}
149

150
      # If there's a parse error or the token is nil, return an error
151
      # this just assumes the errors are useful in some way
152
      error ->
153
        {:error, error}
154
    end
155
  end
156

157
  @spec parse_cmd_response(map) ::
158
          {:ok, %{token: String.t() | nil, expiration_timestamp: DateTime.t() | nil}}
159
          | {:error, Jason.DecodeError.t() | Error.t()}
160
          | {:error, Error.t()}
161
          | {:error, atom()}
162
  defp parse_cmd_response(%{
163
         "kind" => "ExecCredential",
164
         "status" => %{"token" => token, "expirationTimestamp" => expire}
165
       }) do
166
    case DateTime.from_iso8601(expire) do
4✔
167
      {:ok, expiration_timestamp, _} ->
4✔
168
        {:ok, %{token: token, expiration_timestamp: expiration_timestamp}}
169

170
      {:error, _} = error ->
171
        error
×
172
    end
173
  end
174

175
  defp parse_cmd_response(%{"kind" => "ExecCredential", "status" => %{"token" => token}}) do
4✔
176
    {:ok, %{token: token, expiration_timestamp: nil}}
177
  end
178

179
  defp parse_cmd_response(_) do
180
    msg = "#{__MODULE__} failed: Unsupported ExecCredential"
2✔
181
    {:error, %Error{message: msg}}
182
  end
183

184
  @spec format_env(list()) :: map()
185
  defp format_env(env), do: Map.new(env, &{&1["name"], &1["value"]})
10✔
186

187
  @spec valid_token?(State.t()) :: boolean()
188
  defp valid_token?(%State{token: token} = _state) when is_nil(token), do: false
×
189
  defp valid_token?(%State{token: token} = _state) when byte_size(token) == 0, do: false
×
190
  defp valid_token?(%State{expiration_timestamp: exp} = _state) when is_nil(exp), do: true
4✔
191

192
  defp valid_token?(%State{expiration_timestamp: exp} = _state),
193
    do: DateTime.compare(DateTime.utc_now(), exp) == :lt
4✔
194

195
  @spec update_state_with_response(State.t(), map()) :: State.t()
196
  defp update_state_with_response(
197
         %State{} = state,
198
         %{token: token, expiration_timestamp: expiration_timestamp} = _response
199
       ) do
200
    %State{
8✔
201
      state
202
      | token: token,
203
        expiration_timestamp: expiration_timestamp
204
    }
205
  end
206

207
  @spec schedule_refresh(State.t()) :: State.t()
208
  defp schedule_refresh(%State{} = state) do
209
    timer =
16✔
210
      Process.send_after(
211
        self(),
212
        :refresh_token,
213
        refresh_delay(state, nil)
214
      )
215

216
    %State{state | timer: timer}
16✔
217
  end
218

219
  @spec schedule_refresh(State.t(), map()) :: State.t()
220
  defp schedule_refresh(
221
         %State{} = state,
222
         %{expiration_timestamp: expiration_timestamp} = _response
223
       ) do
224
    timer =
8✔
225
      Process.send_after(
226
        self(),
227
        :refresh_token,
228
        refresh_delay(state, expiration_timestamp)
229
      )
230

231
    %State{state | timer: timer}
8✔
232
  end
233

234
  @spec refresh_delay(State.t(), DateTime.t() | nil) :: non_neg_integer()
235
  defp refresh_delay(%State{refresh_interval: max_interval}, nil) do
236
    min_interval = (max_interval * 0.95) |> round() |> max(1)
20✔
237
    Enum.random(min_interval..max_interval)
20✔
238
  end
239

240
  defp refresh_delay(%State{refresh_interval: interval}, time_stamp) do
241
    # We will wait up the the configured refresh interval
242
    # or the expiration time, whichever comes first
243
    max_interval =
4✔
244
      time_stamp
245
      |> DateTime.diff(DateTime.utc_now(), :millisecond)
246
      |> min(interval)
247

248
    # Rather than always picking the max_interval, we'll remove a little for jitter
249
    min_interval = (max_interval * 0.95) |> round() |> max(1)
4✔
250

251
    Enum.random(min_interval..max(max_interval, min_interval + 1))
4✔
252
  end
253
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc