• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coryodaniel / k8s / 6b716253eed1b5b0e09baffde7df811d60400251-PR-310

18 Mar 2024 03:23AM UTC coverage: 84.158% (-0.09%) from 84.249%
6b716253eed1b5b0e09baffde7df811d60400251-PR-310

Pull #310

github

web-flow
Bump castore from 1.0.5 to 1.0.6

Bumps [castore](https://github.com/elixir-mint/castore) from 1.0.5 to 1.0.6.
- [Commits](https://github.com/elixir-mint/castore/compare/v1.0.5...v1.0.6)

---
updated-dependencies:
- dependency-name: castore
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #310: Bump castore from 1.0.5 to 1.0.6

919 of 1092 relevant lines covered (84.16%)

181.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.74
/lib/k8s/client/mint/http_adapter.ex
1
defmodule K8s.Client.Mint.HTTPAdapter do
2
  @moduledoc """
3
  The Mint client implementation. This module handles both, HTTP requests and
4
  websocket connections.
5

6
  ## Processes
7

8
  The module creates a process per connection to the Kubernetes API server.
9
  It supports `HTTP/2` for HTTP requests, but not for websockets. So while
10
  an open connection can process multiple requests (if the server supports
11
  `HTTP/2`), it can only process one single websocket connection.
12
  Therefore, each websocket connection is handled in its own process.
13

14
  ## State
15

16
  The module keeps track of the `Mint.HTTP` connection struct and a map of
17
  pending requests (`K8s.Client.Mint.Request`) for that connection, indexed by the
18
  `Mint.Types.request_ref()`.
19

20
  ### Requests
21

22
  Requests are calls to the GenServer that immediately return while the GenServer
23
  receives the response parts in the background. The way these response parts are
24
  returned depends on the `state_to` argument passed to `request/7` resp.
25
  `websocket_request/5`. See these function's docs for more details.
26
  """
27
  use GenServer, restart: :temporary
28

29
  alias K8s.Client.HTTPError
30
  alias K8s.Client.Mint.Request
31

32
  import K8s.Sys.Logger, only: [log_prefix: 1]
33
  require Logger
34
  require Mint.HTTP
35

36
  # healthcheck frequency in seconds
37
  @healthcheck_freq 30
38

39
  @type connection_args_t ::
40
          {scheme :: atom(), host :: binary(), port :: integer(), opts :: keyword()}
41

42
  @typedoc """
43
  Describes the state of the connection.
44

45
  - `:conn` - The current state of the `Mint` connection.
46
  - `:requests` - The opened requests over this connection (Only `HTTP/2` connections will hold multiple entries in this field.)
47
  """
48
  @type t :: %__MODULE__{
49
          conn: Mint.HTTP.t(),
50
          requests: %{reference() => Request.t()}
51
        }
52

53
  defstruct [:conn, requests: %{}]
54

55
  @doc """
56
  Opens a connection to Kubernetes, defined by `uri` and `opts`,
57
  and starts the GenServer.
58
  """
59
  @spec start_link(connection_args_t()) :: GenServer.on_start()
60
  def start_link(conn_args) do
61
    GenServer.start_link(__MODULE__, conn_args)
20✔
62
  end
63

64
  @doc """
65
  Returns connection arguments for the given `URI` and HTTP options.
66
  """
67
  @spec connection_args(URI.t(), keyword()) :: connection_args_t()
68
  def connection_args(uri, opts) do
69
    {String.to_atom(uri.scheme), uri.host, uri.port, opts}
516✔
70
  end
71

72
  @spec open?(GenServer.server(), :read | :write | :read_write) :: boolean()
73
  def open?(pid, type \\ :read_write) do
508✔
74
    GenServer.call(pid, {:open?, type})
508✔
75
  catch
76
    :exit, _ -> false
×
77
  end
78

79
  @doc """
80
  Starts a HTTP request. The way the response parts are returned depends on the
81
  `stream_to` argument passed to it.
82

83
    - `nil` - response parts are buffered. In order to receive them, the caller
84
      needs to `recv/2` passing it the `request_ref` returned by this function.
85
    - `pid` - response parts are sent to the process with the given `pid`.
86
    - `{pid, reference}` - response parts are sent to the process with the given
87
      `pid`. Messages are of the form `{reference, response_part}`.
88
  """
89
  @spec request(
90
          GenServer.server(),
91
          method :: binary(),
92
          path :: binary(),
93
          Mint.Types.headers(),
94
          body :: iodata() | nil | :stream,
95
          pool :: pid() | nil,
96
          stream_to :: pid() | nil
97
        ) :: {:ok, reference()} | {:error, HTTPError.t()}
98
  def request(pid, method, path, headers, body, pool, stream_to) do
99
    GenServer.call(pid, {:request, method, path, headers, body, pool, stream_to})
508✔
100
  end
101

102
  @doc """
103
  Upgrades to a Websocket connection. The way the frames are returned depends
104
  on the `stream_to` argument passed to it.
105

106
    - `nil` - frames are buffered. In order to receive them, the caller
107
      needs to `recv/2` passing it the `request_ref` returned by this function.
108
    - `pid` - frames are sent to the process with the given `pid`.
109
    - `{pid, reference}` - frames are sent to the process with the given
110
      `pid`. Messages are of the form `{reference, response_part}`.
111
  """
112
  @spec websocket_request(
113
          pid(),
114
          path :: binary(),
115
          Mint.Types.headers(),
116
          pool :: pid() | nil,
117
          stream_to :: pid() | nil
118
        ) :: {:ok, reference()} | {:error, HTTPError.t()}
119
  def websocket_request(pid, path, headers, pool, stream_to) do
120
    GenServer.call(pid, {:websocket_request, path, headers, pool, stream_to})
8✔
121
  end
122

123
  @doc """
124
  Returns response parts / frames that were buffered by the process. The
125
  `stream_to` must have been set to `nil` in `request/7` or
126
  `websocket_request/5`.
127

128
  If the buffer is empty, this call blocks until the next response part /
129
  frame is received.
130
  """
131
  @spec recv(GenServer.server(), reference()) :: list()
132
  def recv(pid, ref) do
133
    GenServer.call(pid, {:recv, ref}, :infinity)
2,080✔
134
  end
135

136
  @doc """
137
  Sends the given `data` throught the websocket specified by the `request_ref`.
138
  """
139
  @spec websocket_send(
140
          GenServer.server(),
141
          reference(),
142
          term()
143
        ) :: :ok
144
  def websocket_send(pid, request_ref, data) do
145
    GenServer.cast(pid, {:websocket_send, request_ref, data})
2✔
146
  end
147

148
  @impl true
149
  def init({scheme, host, port, opts}) do
150
    case Mint.HTTP.connect(scheme, host, port, opts) do
20✔
151
      {:ok, conn} ->
152
        Process.send_after(self(), :healthcheck, @healthcheck_freq * 1_000)
20✔
153
        state = %__MODULE__{conn: conn}
20✔
154
        {:ok, state}
155

156
      {:error, error} ->
157
        Logger.error(log_prefix("Failed initializing HTTPAdapter GenServer"), library: :k8s)
×
158
        {:stop, HTTPError.from_exception(error)}
159
    end
160
  end
161

162
  @impl true
163
  def handle_call({:open?, type}, _from, state) do
164
    {:reply, Mint.HTTP.open?(state.conn, type), state}
508✔
165
  end
166

167
  def handle_call({:request, method, path, headers, body, pool, stream_to}, from, state) do
168
    caller_ref = from |> elem(0) |> Process.monitor()
508✔
169
    conn = state.conn
508✔
170

171
    # For HTTP2, if the body is larger than the connection window, we've got to
172
    # stream it to the server.
173
    {body, pending_request_body} =
508✔
174
      cond do
175
        Mint.HTTP.protocol(conn) == :http1 -> {body, nil}
×
176
        is_nil(body) -> {nil, nil}
508✔
177
        :otherwise -> {:stream, body}
30✔
178
      end
179

180
    with {:ok, conn, request_ref} <- Mint.HTTP.request(conn, method, path, headers, body),
508✔
181
         {:request, request} <-
508✔
182
           {:request,
183
            Request.new(
184
              request_ref: request_ref,
185
              pool: pool,
186
              stream_to: stream_to,
187
              caller_ref: caller_ref,
188
              pending_request_body: pending_request_body
189
            )},
190
         {:ok, request, conn} <- Request.stream_request_body(request, conn) do
508✔
191
      state = put_in(state.requests[request_ref], request) |> struct!(conn: conn)
508✔
192
      {:reply, {:ok, request_ref}, state}
508✔
193
    else
194
      {:error, conn, error} ->
195
        state = struct!(state, conn: conn)
×
196

197
        {:reply, {:error, HTTPError.from_exception(error)}, state}
×
198
    end
199
  end
200

201
  def handle_call({:websocket_request, path, headers, pool, stream_to}, from, state) do
202
    caller_ref = from |> elem(0) |> Process.monitor()
8✔
203

204
    with {:ok, conn} <- Mint.HTTP.set_mode(state.conn, :passive),
8✔
205
         {:ok, conn, request_ref} <- Mint.WebSocket.upgrade(:wss, conn, path, headers),
8✔
206
         {:ok, conn, response} <- Request.receive_upgrade_response(conn, request_ref),
8✔
207
         {:ok, conn} <- Mint.HTTP.set_mode(conn, :active),
8✔
208
         {:ok, conn, websocket} <-
8✔
209
           Mint.WebSocket.new(conn, request_ref, response.status, response.headers) do
8✔
210
      state =
6✔
211
        put_in(
6✔
212
          state.requests[request_ref],
213
          Request.new(
214
            request_ref: request_ref,
215
            websocket: websocket,
216
            pool: pool,
217
            stream_to: stream_to,
218
            caller_ref: caller_ref
219
          )
220
        )
221

222
      send(stream_to, {:open, true})
6✔
223
      {:reply, {:ok, request_ref}, struct!(state, conn: conn)}
6✔
224
    else
225
      {:error, error} ->
226
        GenServer.reply(from, {:error, HTTPError.from_exception(error)})
×
227
        {:stop, :normal, state}
×
228

229
      {:error, conn, error} ->
230
        GenServer.reply(from, {:error, HTTPError.from_exception(error)})
2✔
231
        {:stop, :normal, struct!(state, conn: conn)}
2✔
232
    end
233
  end
234

235
  def handle_call({:recv, request_ref}, from, state) do
236
    {_, state} =
2,080✔
237
      get_and_update_in(
2,080✔
238
        state.requests[request_ref],
239
        &Request.recv(&1, from)
2,080✔
240
      )
241

242
    shutdown_if_closed(state)
2,080✔
243
  end
244

245
  @impl true
246
  def handle_cast({:websocket_send, request_ref, data}, state) do
247
    request = state.requests[request_ref]
2✔
248

249
    with {:ok, frame} <- Request.map_outgoing_frame(data),
2✔
250
         {:ok, websocket, data} <- Mint.WebSocket.encode(request.websocket, frame),
2✔
251
         {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, request_ref, data) do
2✔
252
      state = struct!(state, conn: conn)
2✔
253
      state = put_in(state.requests[request_ref].websocket, websocket)
2✔
254
      {:noreply, state}
255
    end
256
  end
257

258
  @impl true
259
  def handle_info(message, %__MODULE__{conn: conn} = state)
260
      when Mint.HTTP.is_connection_message(conn, message) do
261
    with {:ok, conn, responses} <- Mint.WebSocket.stream(state.conn, message),
1,361✔
262
         {:ok, state} <- stream_pending_request_bodies(struct!(state, conn: conn)) do
1,356✔
263
      process_responses_or_frames(state, responses)
1,356✔
264
    else
265
      {:error, conn, %Mint.TransportError{reason: :closed}, []} ->
266
        Logger.debug("The connection was closed.", library: :k8s)
5✔
267

268
        shutdown_if_closed(struct!(state, conn: conn))
5✔
269

270
      {:error, conn, error} ->
271
        Logger.warning(
×
272
          log_prefix(
273
            "An error occurred when streaming the request body: #{Exception.message(error)}"
×
274
          ),
275
          error: error,
276
          library: :k8s
277
        )
278

279
        struct!(state, conn: conn)
×
280

281
      {:error, conn, error, responses} ->
282
        Logger.warning(
×
283
          log_prefix(
284
            "An error occurred when streaming the response: #{Exception.message(error)}"
×
285
          ),
286
          error: error,
287
          library: :k8s
288
        )
289

290
        state
291
        |> struct!(conn: conn)
292
        |> process_responses_or_frames(responses)
×
293
    end
294
  end
295

296
  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
297
    state =
4✔
298
      state.requests
4✔
299
      |> Map.filter(fn {_request_ref, request} -> request.caller_ref == ref end)
4✔
300
      |> Map.keys()
301
      |> Enum.reduce_while(state, fn
3✔
302
        request_ref, state ->
303
          case pop_in(state.requests[request_ref]) do
4✔
304
            {%Request{}, %{conn: %Mint.HTTP2{}} = state} ->
305
              conn = Mint.HTTP2.cancel_request(state.conn, request_ref) |> elem(1)
3✔
306
              {:cont, struct!(state, conn: conn)}
307

308
            {_, state} ->
1✔
309
              {:halt, {:stop, state}}
310
          end
311
      end)
312

313
    case state do
4✔
314
      {:stop, state} ->
315
        Logger.debug(
1✔
316
          log_prefix(
317
            "Received :DOWN signal from parent process. Terminating HTTPAdapter #{inspect(self())}."
318
          ),
319
          library: :k8s
320
        )
321

322
        {:stop, :normal, state}
1✔
323

324
      state ->
3✔
325
        {:noreply, state}
326
    end
327
  end
328

329
  # This is called regularly to check whether the connection is still open. If
330
  # it's not open, and all buffers are emptied, this process is considered
331
  # garbage and is stopped.
332
  def handle_info(:healthcheck, state) do
333
    if Mint.HTTP.open?(state.conn, :read) or any_non_empty_buffers?(state) do
×
334
      Process.send_after(self(), :healthcheck, @healthcheck_freq * 1000)
×
335
      {:noreply, state}
336
    else
337
      Logger.warning(
×
338
        log_prefix("Connection closed for reading and writing - stopping this process."),
339
        library: :k8s
340
      )
341

342
      {:stop, {:shutdown, :closed}, state}
×
343
    end
344
  end
345

346
  @impl true
347
  def terminate(reason, state) do
348
    state = state
8✔
349

350
    state.requests
8✔
351
    |> Enum.each(fn
8✔
352
      {_request_ref, request} when is_nil(request.websocket) ->
353
        Request.put_response(
×
354
          request,
355
          {:error, reason}
356
        )
357

358
      {request_ref, request} ->
359
        {:ok, _websocket, data} = Mint.WebSocket.encode(request.websocket, :close)
×
360
        Mint.WebSocket.stream_request_body(state.conn, request_ref, data)
×
361
    end)
362

363
    Mint.HTTP.close(state.conn)
8✔
364

365
    Logger.debug(log_prefix("Terminating HTTPAdapter GenServer #{inspect(self())}"),
8✔
366
      library: :k8s
367
    )
368

369
    :ok
370
  end
371

372
  @spec stream_pending_request_bodies(t()) ::
373
          {:ok, t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
374
  defp stream_pending_request_bodies(state) do
375
    stream_pending_request_bodies(state, Map.values(state.requests))
1,356✔
376
  end
377

378
  @spec stream_pending_request_bodies(t(), [Request.t()]) ::
379
          {:ok, t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
380
  defp stream_pending_request_bodies(state, []) do
1,356✔
381
    {:ok, state}
382
  end
383

384
  defp stream_pending_request_bodies(state, [request | rest]) do
385
    case Request.stream_request_body(
3,476✔
386
           request,
387
           state.conn
3,476✔
388
         ) do
389
      {:ok, request, conn} ->
390
        state =
3,476✔
391
          put_in(state.requests[request.request_ref], request)
3,476✔
392
          |> struct!(conn: conn)
393

394
        stream_pending_request_bodies(state, rest)
3,476✔
395

396
      {:error, conn, error} ->
397
        {:error, conn, error}
×
398
    end
399
  end
400

401
  @spec process_responses_or_frames(t(), [Mint.Types.response()]) :: {:noreply, t()}
402
  defp process_responses_or_frames(state, [{:data, request_ref, data}] = responses) do
403
    request = state.requests[request_ref]
394✔
404

405
    if is_nil(request.websocket) do
394✔
406
      process_responses(state, responses)
380✔
407
    else
408
      {:ok, websocket, frames} = Mint.WebSocket.decode(request.websocket, data)
14✔
409
      state = put_in(state.requests[request_ref].websocket, websocket)
14✔
410
      process_frames(state, request_ref, frames)
14✔
411
    end
412
  end
413

414
  defp process_responses_or_frames(state, responses) do
415
    process_responses(state, responses)
962✔
416
  end
417

418
  @spec process_frames(t(), reference(), list(Mint.WebSocket.frame())) :: {:noreply, t()}
419
  defp process_frames(state, request_ref, frames) do
420
    state =
14✔
421
      frames
422
      |> Enum.map(&Request.map_frame/1)
423
      |> Enum.reduce(state, fn mapped_frame, state ->
424
        {_, state} =
16✔
425
          get_and_update_in(
16✔
426
            state.requests[request_ref],
427
            &Request.put_response(&1, mapped_frame)
16✔
428
          )
429

430
        state
16✔
431
      end)
432

433
    {:noreply, state}
434
  end
435

436
  @spec process_responses(t(), [Mint.Types.response()]) :: {:noreply, t()}
437
  defp process_responses(state, responses) do
438
    state =
1,342✔
439
      responses
440
      |> Enum.map(&Request.map_response/1)
441
      |> Enum.reduce(state, fn {mapped_response, request_ref}, state ->
442
        {_, state} =
3,038✔
443
          get_and_update_in(
3,038✔
444
            state.requests[request_ref],
445
            &Request.put_response(&1, mapped_response)
3,038✔
446
          )
447

448
        state
3,038✔
449
      end)
450

451
    {:noreply, state}
452
  end
453

454
  @spec shutdown_if_closed(t()) :: {:noreply, t()} | {:stop, {:shutdown, :closed}, t()}
455
  defp shutdown_if_closed(state) do
456
    if Mint.HTTP.open?(state.conn, :read) or any_non_empty_buffers?(state) do
2,085✔
457
      {:noreply, state}
458
    else
459
      Logger.warning(
5✔
460
        log_prefix("Connection closed for reading and writing - stopping this process."),
461
        library: :k8s
462
      )
463

464
      {:stop, {:shutdown, :closed}, state}
5✔
465
    end
466
  end
467

468
  @spec any_non_empty_buffers?(t()) :: boolean()
469
  defp any_non_empty_buffers?(state) do
470
    Enum.any?(state.requests, fn {_, request} -> request.buffer != [] end)
7✔
471
  end
472
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc