• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coryodaniel / k8s / 807e93631268e5fd52ca29e3e4755088cf11bf27-PR-262

pending completion
807e93631268e5fd52ca29e3e4755088cf11bf27-PR-262

Pull #262

github

mruoss
add possibility to wait for delete
Pull Request #262: add possibility to wait for delete

6 of 6 new or added lines in 1 file covered. (100.0%)

732 of 1009 relevant lines covered (72.55%)

44.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/lib/k8s/client/mint/http_adapter.ex
1
defmodule K8s.Client.Mint.HTTPAdapter do
2
  @moduledoc """
3
  The Mint client implementation. This module handles both, HTTP requests and
4
  websocket connections.
5

6
  ## Processes
7

8
  The module creates a process per connection to the Kubernetes API server.
9
  It supports `HTTP/2` for HTTP requests, but not for websockets. So while
10
  an open connection can process multiple requests (if the server supports
11
  `HTTP/2`), it can only process one single websocket connection.
12
  Therefore, each websocket connection is handled in its own process.
13

14
  ## State
15

16
  The module keeps track of the `Mint.HTTP` connection struct and a map of
17
  pending requests (`K8s.Client.Mint.Request`) for that connection, indexed by the
18
  `Mint.Types.request_ref()`.
19

20
  ### Requests
21

22
  Requests are calls to the GenServer that immediately return while the GenServer
23
  receives the response parts in the background. The way these response parts are
24
  returned depends on the `state_to` argument passed to `request/7` resp.
25
  `websocket_request/5`. See these function's docs for more details.
26
  """
27
  use GenServer, restart: :temporary
28

29
  alias K8s.Client.HTTPError
30
  alias K8s.Client.Mint.Request
31

32
  import K8s.Sys.Logger, only: [log_prefix: 1]
33
  require Logger
34
  require Mint.HTTP
35

36
  # healthcheck frequency in seconds
37
  @healthcheck_freq 30
38

39
  @type connection_args_t ::
40
          {scheme :: atom(), host :: binary(), port :: integer(), opts :: keyword()}
41

42
  @typedoc """
43
  Describes the state of the connection.
44

45
  - `:conn` - The current state of the `Mint` connection.
46
  - `:requests` - The opened requests over this connection (Only `HTTP/2` connections will hold multiple entries in this field.)
47
  """
48
  @type t :: %__MODULE__{
49
          conn: Mint.HTTP.t(),
50
          requests: %{reference() => Request.t()}
51
        }
52

53
  defstruct [:conn, requests: %{}]
54

55
  @doc """
56
  Opens a connection to Kubernetes, defined by `uri` and `opts`,
57
  and starts the GenServer.
58
  """
59
  @spec start_link(connection_args_t()) :: GenServer.on_start()
60
  def start_link(conn_args) do
61
    GenServer.start_link(__MODULE__, conn_args)
12✔
62
  end
63

64
  @doc """
65
  Returns connection arguments for the given `URI` and HTTP options.
66
  """
67
  @spec connection_args(URI.t(), keyword()) :: connection_args_t()
68
  def connection_args(uri, opts) do
69
    {String.to_atom(uri.scheme), uri.host, uri.port, opts}
138✔
70
  end
71

72
  @spec open?(GenServer.server(), :read | :write | :read_write) :: boolean()
73
  def open?(pid, type \\ :read_write) do
136✔
74
    GenServer.call(pid, {:open?, type})
136✔
75
  catch
76
    :exit, _ -> false
×
77
  end
78

79
  @doc """
80
  Starts a HTTP request. The way the response parts are returned depends on the
81
  `stream_to` argument passed to it.
82

83
    - `nil` - response parts are buffered. In order to receive them, the caller
84
      needs to `recv/2` passing it the `request_ref` returned by this function.
85
    - `pid` - response parts are sent to the process with the given `pid`.
86
    - `{pid, reference}` - response parts are sent to the process with the given
87
      `pid`. Messages are of the form `{reference, response_part}`.
88
  """
89
  @spec request(
90
          GenServer.server(),
91
          method :: binary(),
92
          path :: binary(),
93
          Mint.Types.headers(),
94
          body :: iodata() | nil | :stream,
95
          pool :: pid() | nil,
96
          stream_to :: pid() | nil
97
        ) :: {:ok, reference()} | {:error, HTTPError.t()}
98
  def request(pid, method, path, headers, body, pool, stream_to) do
99
    GenServer.call(pid, {:request, method, path, headers, body, pool, stream_to})
136✔
100
  end
101

102
  @doc """
103
  Upgrades to a Websocket connection. The way the frames are returned depends
104
  on the `stream_to` argument passed to it.
105

106
    - `nil` - frames are buffered. In order to receive them, the caller
107
      needs to `recv/2` passing it the `request_ref` returned by this function.
108
    - `pid` - frames are sent to the process with the given `pid`.
109
    - `{pid, reference}` - frames are sent to the process with the given
110
      `pid`. Messages are of the form `{reference, response_part}`.
111
  """
112
  @spec websocket_request(
113
          pid(),
114
          path :: binary(),
115
          Mint.Types.headers(),
116
          pool :: pid() | nil,
117
          stream_to :: pid() | nil
118
        ) :: {:ok, reference()} | {:error, HTTPError.t()}
119
  def websocket_request(pid, path, headers, pool, stream_to) do
120
    GenServer.call(pid, {:websocket_request, path, headers, pool, stream_to})
2✔
121
  end
122

123
  @doc """
124
  Returns response parts / frames that were buffered by the process. The
125
  `stream_to` must have been set to `nil` in `request/7` or
126
  `websocket_request/5`.
127

128
  If the buffer is empty, this call blocks until the next response part /
129
  frame is received.
130
  """
131
  @spec recv(GenServer.server(), reference()) :: list()
132
  def recv(pid, ref) do
133
    GenServer.call(pid, {:recv, ref}, :infinity)
395✔
134
  end
135

136
  @doc """
137
  Sends the given `data` throught the websocket specified by the `request_ref`.
138
  """
139
  @spec websocket_send(
140
          GenServer.server(),
141
          reference(),
142
          term()
143
        ) :: :ok
144
  def websocket_send(pid, request_ref, data) do
145
    GenServer.cast(pid, {:websocket_send, request_ref, data})
×
146
  end
147

148
  @impl true
149
  def init({scheme, host, port, opts}) do
150
    case Mint.HTTP.connect(scheme, host, port, opts) do
12✔
151
      {:ok, conn} ->
152
        Process.send_after(self(), :healthcheck, @healthcheck_freq * 1_000)
12✔
153
        state = %__MODULE__{conn: conn}
12✔
154
        {:ok, state}
155

156
      {:error, error} ->
157
        Logger.error(log_prefix("Failed initializing HTTPAdapter GenServer"), library: :k8s)
×
158
        {:stop, HTTPError.from_exception(error)}
159
    end
160
  end
161

162
  @impl true
163
  def handle_call({:open?, type}, _from, state) do
164
    {:reply, Mint.HTTP.open?(state.conn, type), state}
136✔
165
  end
166

167
  def handle_call({:request, method, path, headers, body, pool, stream_to}, from, state) do
168
    caller_ref = from |> elem(0) |> Process.monitor()
136✔
169
    conn = state.conn
136✔
170

171
    # For HTTP2, if the body is larger than the connection window, we've got to
172
    # stream it to the server.
173
    {body, pending_request_body} =
136✔
174
      cond do
175
        Mint.HTTP.protocol(conn) == :http1 -> {body, nil}
×
176
        is_nil(body) -> {nil, nil}
136✔
177
        :otherwise -> {:stream, body}
7✔
178
      end
179

180
    with {:ok, conn, request_ref} <- Mint.HTTP.request(conn, method, path, headers, body),
136✔
181
         {:request, request} <-
136✔
182
           {:request,
183
            Request.new(
184
              request_ref: request_ref,
185
              pool: pool,
186
              stream_to: stream_to,
187
              caller_ref: caller_ref,
188
              pending_request_body: pending_request_body
189
            )},
190
         {:ok, request, conn} <- Request.stream_request_body(request, conn) do
136✔
191
      state = put_in(state.requests[request_ref], request) |> struct!(conn: conn)
136✔
192
      {:reply, {:ok, request_ref}, state}
136✔
193
    else
194
      {:error, conn, error} ->
195
        state = struct!(state, conn: conn)
×
196

197
        {:reply, {:error, HTTPError.from_exception(error)}, state}
×
198
    end
199
  end
200

201
  def handle_call({:websocket_request, path, headers, pool, stream_to}, from, state) do
202
    caller_ref = from |> elem(0) |> Process.monitor()
2✔
203

204
    with {:ok, conn} <- Mint.HTTP.set_mode(state.conn, :passive),
2✔
205
         {:ok, conn, request_ref} <- Mint.WebSocket.upgrade(:wss, conn, path, headers),
2✔
206
         {:ok, conn, response} <- Request.receive_upgrade_response(conn, request_ref),
2✔
207
         {:ok, conn} <- Mint.HTTP.set_mode(conn, :active),
2✔
208
         {:ok, conn, websocket} <-
2✔
209
           Mint.WebSocket.new(conn, request_ref, response.status, response.headers) do
2✔
210
      state =
1✔
211
        put_in(
1✔
212
          state.requests[request_ref],
213
          Request.new(
214
            request_ref: request_ref,
215
            websocket: websocket,
216
            pool: pool,
217
            stream_to: stream_to,
218
            caller_ref: caller_ref
219
          )
220
        )
221

222
      send(stream_to, {:open, true})
1✔
223
      {:reply, {:ok, request_ref}, struct!(state, conn: conn)}
1✔
224
    else
225
      {:error, error} ->
226
        GenServer.reply(from, {:error, HTTPError.from_exception(error)})
×
227
        {:stop, :normal, state}
×
228

229
      {:error, conn, error} ->
230
        GenServer.reply(from, {:error, HTTPError.from_exception(error)})
1✔
231
        {:stop, :normal, struct!(state, conn: conn)}
1✔
232
    end
233
  end
234

235
  def handle_call({:recv, request_ref}, from, state) do
236
    {_, state} =
395✔
237
      get_and_update_in(
395✔
238
        state.requests[request_ref],
239
        &Request.recv(&1, from)
395✔
240
      )
241

242
    {:noreply, state}
243
  end
244

245
  @impl true
246
  def handle_cast({:websocket_send, request_ref, data}, state) do
247
    request = state.requests[request_ref]
×
248

249
    with {:ok, frame} <- Request.map_outgoing_frame(data),
×
250
         {:ok, websocket, data} <- Mint.WebSocket.encode(request.websocket, frame),
×
251
         {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, request_ref, data) do
×
252
      state = struct!(state, conn: conn)
×
253
      state = put_in(state.requests[request_ref].websocket, websocket)
×
254
      {:noreply, state}
255
    end
256
  end
257

258
  @impl true
259
  def handle_info(message, %__MODULE__{conn: conn} = state)
260
      when Mint.HTTP.is_connection_message(conn, message) do
261
    with {:ok, conn, responses} <- Mint.WebSocket.stream(state.conn, message),
190✔
262
         {:ok, state} <- stream_pending_request_bodies(struct!(state, conn: conn)) do
189✔
263
      process_responses_or_frames(state, responses)
189✔
264
    else
265
      {:error, conn, %Mint.TransportError{reason: :closed}, []} ->
266
        Logger.debug("The connection was closed.", library: :k8s)
1✔
267

268
        # We could terminate the process here. But there might still be chunks
269
        # in the buffer, so we let the healthcheck take care of that.
270
        {:noreply, struct!(state, conn: conn)}
271

272
      {:error, conn, error} ->
273
        Logger.warning(
×
274
          log_prefix(
275
            "An error occurred when streaming the request body: #{Exception.message(error)}"
×
276
          ),
277
          error: error,
278
          library: :k8s
279
        )
280

281
        struct!(state, conn: conn)
×
282

283
      {:error, conn, error, responses} ->
284
        Logger.warning(
×
285
          log_prefix(
286
            "An error occurred when streaming the response: #{Exception.message(error)}"
×
287
          ),
288
          error: error,
289
          library: :k8s
290
        )
291

292
        state
293
        |> struct!(conn: conn)
294
        |> process_responses_or_frames(responses)
×
295
    end
296
  end
297

298
  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
299
    state =
1✔
300
      state.requests
1✔
301
      |> Map.filter(fn {_request_ref, request} -> request.caller_ref == ref end)
×
302
      |> Map.keys()
303
      |> Enum.reduce_while(state, fn
×
304
        request_ref, state ->
305
          case pop_in(state.requests[request_ref]) do
×
306
            {%Request{}, %{conn: %Mint.HTTP2{}} = state} ->
307
              conn = Mint.HTTP2.cancel_request(state.conn, request_ref) |> elem(1)
×
308
              {:cont, struct!(state, conn: conn)}
309

310
            {_, state} ->
×
311
              {:halt, {:stop, state}}
312
          end
313
      end)
314

315
    case state do
1✔
316
      {:stop, state} ->
317
        Logger.debug(
×
318
          log_prefix(
319
            "Received :DOWN signal from parent process. Terminating HTTPAdapter #{inspect(self())}."
320
          ),
321
          library: :k8s
322
        )
323

324
        {:stop, :normal, state}
×
325

326
      state ->
1✔
327
        {:noreply, state}
328
    end
329
  end
330

331
  # This is called regularly to check whether the connection is still open. If
332
  # it's not open, and all buffers are emptied, this process is considered
333
  # garbage and is stopped.
334
  def handle_info(:healthcheck, state) do
335
    any_non_empty_buffers? =
×
336
      Enum.any?(state.requests, fn {_, request} -> request.buffer != [] end)
×
337

338
    if Mint.HTTP.open?(state.conn) or any_non_empty_buffers? do
×
339
      Process.send_after(self(), :healthcheck, @healthcheck_freq * 1_000)
×
340
      {:noreply, state}
341
    else
342
      Logger.warning(
×
343
        log_prefix("Connection closed for reading and writing - stopping this process."),
344
        library: :k8s
345
      )
346

347
      {:stop, :closed, state}
×
348
    end
349
  end
350

351
  @impl true
352
  def terminate(reason, state) do
353
    state = state
1✔
354

355
    state.requests
1✔
356
    |> Enum.each(fn
1✔
357
      {_request_ref, request} when is_nil(request.websocket) ->
358
        Request.put_response(
×
359
          request,
360
          {:error, reason}
361
        )
362

363
      {request_ref, request} ->
364
        {:ok, _websocket, data} = Mint.WebSocket.encode(request.websocket, :close)
×
365
        Mint.WebSocket.stream_request_body(state.conn, request_ref, data)
×
366
    end)
367

368
    Mint.HTTP.close(state.conn)
1✔
369

370
    Logger.debug(log_prefix("Terminating HTTPAdapter GenServer #{inspect(self())}"),
1✔
371
      library: :k8s
372
    )
373

374
    :ok
375
  end
376

377
  @spec stream_pending_request_bodies(t()) ::
378
          {:ok, t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
379
  defp stream_pending_request_bodies(state) do
380
    stream_pending_request_bodies(state, Map.values(state.requests))
189✔
381
  end
382

383
  @spec stream_pending_request_bodies(t(), [Request.t()]) ::
384
          {:ok, t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
385
  defp stream_pending_request_bodies(state, []) do
189✔
386
    {:ok, state}
387
  end
388

389
  defp stream_pending_request_bodies(state, [request | rest]) do
390
    case Request.stream_request_body(
250✔
391
           request,
392
           state.conn
250✔
393
         ) do
394
      {:ok, request, conn} ->
395
        state =
250✔
396
          put_in(state.requests[request.request_ref], request)
250✔
397
          |> struct!(conn: conn)
398

399
        stream_pending_request_bodies(state, rest)
250✔
400

401
      {:error, conn, error} ->
402
        {:error, conn, error}
×
403
    end
404
  end
405

406
  @spec process_responses_or_frames(t(), [Mint.Types.response()]) :: {:noreply, t()}
407
  defp process_responses_or_frames(state, [{:data, request_ref, data}] = responses) do
408
    request = state.requests[request_ref]
16✔
409

410
    if is_nil(request.websocket) do
16✔
411
      process_responses(state, responses)
14✔
412
    else
413
      {:ok, websocket, frames} = Mint.WebSocket.decode(request.websocket, data)
2✔
414
      state = put_in(state.requests[request_ref].websocket, websocket)
2✔
415
      process_frames(state, request_ref, frames)
2✔
416
    end
417
  end
418

419
  defp process_responses_or_frames(state, responses) do
420
    process_responses(state, responses)
173✔
421
  end
422

423
  @spec process_frames(t(), reference(), list(Mint.WebSocket.frame())) :: {:noreply, t()}
424
  defp process_frames(state, request_ref, frames) do
425
    state =
2✔
426
      frames
427
      |> Enum.map(&Request.map_frame/1)
428
      |> Enum.reduce(state, fn mapped_frame, state ->
429
        {_, state} =
2✔
430
          get_and_update_in(
2✔
431
            state.requests[request_ref],
432
            &Request.put_response(&1, mapped_frame)
2✔
433
          )
434

435
        state
2✔
436
      end)
437

438
    {:noreply, state}
439
  end
440

441
  @spec process_responses(t(), [Mint.Types.response()]) :: {:noreply, t()}
442
  defp process_responses(state, responses) do
443
    state =
187✔
444
      responses
445
      |> Enum.map(&Request.map_response/1)
446
      |> Enum.reduce(state, fn {mapped_response, request_ref}, state ->
447
        {_, state} =
592✔
448
          get_and_update_in(
592✔
449
            state.requests[request_ref],
450
            &Request.put_response(&1, mapped_response)
592✔
451
          )
452

453
        state
592✔
454
      end)
455

456
    {:noreply, state}
457
  end
458
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc