• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coryodaniel / k8s / 807e93631268e5fd52ca29e3e4755088cf11bf27-PR-262

pending completion
807e93631268e5fd52ca29e3e4755088cf11bf27-PR-262

Pull #262

github

mruoss
add possibility to wait for delete
Pull Request #262: add possibility to wait for delete

6 of 6 new or added lines in 1 file covered. (100.0%)

732 of 1009 relevant lines covered (72.55%)

44.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.65
/lib/k8s/client/mint_http_provider.ex
1
defmodule K8s.Client.MintHTTPProvider do
2
  @moduledoc """
3
  Mint based `K8s.Client.Provider`
4
  """
5
  @behaviour K8s.Client.Provider
6

7
  alias K8s.Client.HTTPError
8
  alias K8s.Client.Mint.ConnectionRegistry
9
  alias K8s.Client.Mint.HTTPAdapter
10
  require Logger
11

12
  @data_types [:data, :stdout, :stderr, :error]
13

14
  @impl true
15
  def request(method, uri, body, headers, http_opts) do
16
    with {:ok, stream} <- stream(method, uri, body, headers, http_opts) do
135✔
17
      response =
135✔
18
        stream
19
        |> Stream.reject(&(&1 == :done))
588✔
20
        |> Enum.reduce(%{data: []}, fn
21
          {:data, data}, response -> Map.update!(response, :data, &[data | &1])
183✔
22
          {type, value}, response -> Map.put(response, type, value)
270✔
23
          type, response -> Map.put(response, type, true)
×
24
        end)
25

26
      response
27
      |> Map.update!(:data, &(&1 |> Enum.reverse() |> IO.iodata_to_binary()))
135✔
28
      |> process_response()
135✔
29
    end
30
  end
31

32
  @impl true
33
  def stream(method, uri, body, headers, http_opts) do
34
    case do_stream_to(method, uri, body, headers, http_opts, nil) do
136✔
35
      {:ok, request_ref, adapter_pid} ->
36
        stream =
136✔
37
          Stream.resource(
38
            fn -> :pending end,
136✔
39
            fn
40
              :pending ->
41
                parts = HTTPAdapter.recv(adapter_pid, request_ref)
393✔
42
                # credo:disable-for-next-line
43
                next_state = if :done in parts, do: :done, else: :pending
393✔
44
                {parts, next_state}
45

46
              :done ->
136✔
47
                {:halt, :ok}
48
            end,
49
            fn _ -> :ok end
136✔
50
          )
51

52
        {:ok, stream}
53

54
      {:error, error} ->
×
55
        {:error, error}
56
    end
57
  end
58

59
  @impl true
60
  def stream_to(method, uri, body, headers, http_opts, stream_to) do
61
    with {:ok, _, _} <- do_stream_to(method, uri, body, headers, http_opts, stream_to) do
×
62
      :ok
63
    end
64
  end
65

66
  @spec do_stream_to(
67
          method :: atom(),
68
          uri :: URI.t(),
69
          body :: binary,
70
          headers :: list(),
71
          http_opts :: keyword(),
72
          stream_to :: pid() | nil
73
        ) :: {:ok, reference(), pid()} | {:error, HTTPError.t()}
74
  defp do_stream_to(method, uri, body, headers, http_opts, stream_to) do
75
    opts = [transport_opts: Keyword.fetch!(http_opts, :ssl)]
136✔
76
    method = String.upcase("#{method}")
136✔
77
    headers = Enum.map(headers, fn {header, value} -> {"#{header}", "#{value}"} end)
136✔
78
    path = uri_to_path(uri)
136✔
79

80
    with {:ok, %{adapter: adapter_pid, pool: pool}} <- ConnectionRegistry.checkout({uri, opts}),
136✔
81
         {:ok, request_ref} <-
136✔
82
           HTTPAdapter.request(adapter_pid, method, path, headers, body, pool, stream_to) do
83
      {:ok, request_ref, adapter_pid}
136✔
84
    end
85
  end
86

87
  @impl true
88
  def websocket_request(uri, headers, http_opts) do
89
    with {:ok, stream} <- websocket_stream(uri, headers, http_opts) do
×
90
      response =
×
91
        stream
92
        |> Stream.reject(&(&1 == :done))
×
93
        |> Enum.reduce(%{}, fn
94
          {type, data}, response when type in @data_types ->
95
            Map.update(response, type, [data], &[data | &1])
×
96

97
          {type, value}, response ->
98
            Map.put(response, type, value)
×
99
        end)
100

101
      response =
×
102
        @data_types
103
        |> Enum.reduce(response, fn type, response ->
104
          Map.update(response, type, nil, &(&1 |> Enum.reverse() |> IO.iodata_to_binary()))
×
105
        end)
106
        |> Map.reject(&(&1 |> elem(1) |> is_nil()))
×
107

108
      {:ok, response}
109
    end
110
  end
111

112
  @impl true
113
  def websocket_stream(uri, headers, http_opts) do
114
    case do_websocket_stream_to(uri, headers, http_opts, self()) do
2✔
115
      {:ok, request_ref, adapter_pid} ->
116
        stream =
1✔
117
          Stream.resource(
118
            fn -> :pending end,
1✔
119
            fn
120
              :pending ->
121
                parts = HTTPAdapter.recv(adapter_pid, request_ref)
2✔
122

123
                # credo:disable-for-lines:2
124
                next_state =
2✔
125
                  if Enum.any?(parts, &(elem(&1, 0) == :close)), do: :done, else: :pending
2✔
126

127
                {parts, next_state}
128

129
              :done ->
1✔
130
                {:halt, :ok}
131
            end,
132
            fn _ -> :ok end
1✔
133
          )
134

135
        {:ok, stream}
136

137
      {:error, error} ->
1✔
138
        {:error, error}
139
    end
140
  end
141

142
  @impl true
143
  def websocket_stream_to(uri, headers, http_opts, stream_to) do
144
    with {:ok, request_ref, adapter_pid} <-
×
145
           do_websocket_stream_to(uri, headers, http_opts, stream_to) do
146
      send_to_websocket = fn data ->
×
147
        HTTPAdapter.websocket_send(adapter_pid, request_ref, data)
×
148
      end
149

150
      {:ok, send_to_websocket}
151
    end
152
  end
153

154
  @spec do_websocket_stream_to(
155
          uri :: URI.t(),
156
          headers :: list(),
157
          http_opts :: keyword(),
158
          stream_to :: pid() | nil
159
        ) :: {:ok, reference(), pid()} | {:error, HTTPError.t()}
160
  defp do_websocket_stream_to(uri, headers, http_opts, stream_to) do
161
    opts = [transport_opts: Keyword.fetch!(http_opts, :ssl), protocols: [:http1]]
2✔
162
    path = uri_to_path(uri)
2✔
163
    headers = Enum.map(headers, fn {header, value} -> {"#{header}", "#{value}"} end)
2✔
164

165
    with {:ok, %{adapter: adapter_pid, pool: pool}} <- ConnectionRegistry.checkout({uri, opts}),
2✔
166
         {:ok, request_ref} <-
2✔
167
           HTTPAdapter.websocket_request(adapter_pid, path, headers, pool, stream_to) do
168
      {:ok, request_ref, adapter_pid}
1✔
169
    end
170
  end
171

172
  @spec process_response(map()) :: K8s.Client.Provider.response_t()
173
  defp process_response(%{status: status} = response) when status in 400..599 do
174
    %{data: data, headers: headers, status: status_code} = response
×
175

176
    case get_content_type(headers) do
×
177
      "application/json" = content_type ->
178
        data
179
        |> decode(content_type)
180
        |> K8s.Client.APIError.from_kubernetes_error()
×
181

182
      _other ->
×
183
        {:error, K8s.Client.HTTPError.new(message: "HTTP Error #{status_code}")}
×
184
    end
185
  end
186

187
  defp process_response(response) do
188
    content_type = get_content_type(response.headers)
135✔
189
    body = response.data |> decode(content_type)
135✔
190

191
    {:ok, body}
192
  end
193

194
  @spec get_content_type(keyword()) :: binary | nil
195
  defp get_content_type(headers) do
196
    case List.keyfind(headers, "content-type", 0) do
135✔
197
      {_key, content_type} -> content_type |> String.split(";") |> List.first()
135✔
198
      _ -> nil
×
199
    end
200
  end
201

202
  @spec decode(binary, binary) :: map | list | nil
203
  defp decode(body, "text/plain"), do: body
×
204

205
  defp decode(body, "application/json") do
206
    case Jason.decode(body) do
135✔
207
      {:ok, data} ->
208
        data
135✔
209

210
      {:error, error} ->
211
        Logger.error("The response body is supposed to be JSON but could not be decoded.",
×
212
          library: :k8s,
213
          error: error,
214
          body: body
215
        )
216

217
        nil
218
    end
219
  end
220

221
  @spec uri_to_path(URI.t()) :: binary()
222
  defp uri_to_path(uri) do
223
    path =
138✔
224
      IO.iodata_to_binary([
225
        uri.path,
138✔
226
        if(uri.query, do: ["?" | uri.query], else: [])
138✔
227
      ])
228

229
    String.trim(path, "?")
138✔
230
  end
231
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc