• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coryodaniel / k8s / 807e93631268e5fd52ca29e3e4755088cf11bf27-PR-262

pending completion
807e93631268e5fd52ca29e3e4755088cf11bf27-PR-262

Pull #262

github

mruoss
add possibility to wait for delete
Pull Request #262: add possibility to wait for delete

6 of 6 new or added lines in 1 file covered. (100.0%)

732 of 1009 relevant lines covered (72.55%)

44.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.58
/lib/k8s/client/mint/request.ex
1
defmodule K8s.Client.Mint.Request do
2
  @moduledoc """
3
  Maintains the state of a HTTP or Websocket request.
4
  """
5

6
  alias K8s.Client.Mint.ConnectionRegistry
7

8
  @typedoc """
9
  Describes the mode the request is currently in.
10

11
  - `::pending` - The request is still streaming its body to the server
12
  - `:receiving` - The request is currently receiving response parts / frames
13
  - `:closing` - Websocket requests only: The `:close` frame was received but the process wasn't terminated yet
14
  - `:terminating` - HTTP requests only: The `:done` part was received but the request isn't cleaned up yet
15
  """
16
  @type request_modes :: :pending | :receiving | :closing | :terminating
17

18
  @typedoc """
19
  Defines the state of the request.
20

21
  - `:request_ref` - Mint request reference
22
  - `:caller_ref` - Monitor reference of the calling process.
23
  - `:stream_to` - the process expecting response parts sent to.
24
  - `:pool` - the PID of the pool so we can checkin after the last part is sent.
25
  - `:websocket` - for WebSocket requests: The websocket state (`Mint.WebSocket.t()`).
26
  - `:mode` - defines what mode the request is currently in.
27
  - `:buffer` - Holds the buffered response parts or frames that haven't been
28
    sent to / received by the caller yet
29
  - `:pending_request_body` - Part of the request body that has not been sent yet.
30
  """
31
  @type t :: %__MODULE__{
32
          request_ref: Mint.Types.request_ref(),
33
          caller_ref: reference(),
34
          stream_to: pid() | {pid(), reference()} | nil,
35
          pool: pid() | nil,
36
          websocket: Mint.WebSocket.t() | nil,
37
          mode: request_modes(),
38
          buffer: list(),
39
          pending_request_body: binary()
40
        }
41

42
  defstruct [
43
    :request_ref,
44
    :caller_ref,
45
    :stream_to,
46
    :pool,
47
    :websocket,
48
    :pending_request_body,
49
    mode: :pending,
50
    buffer: []
51
  ]
52

53
  @spec new(keyword()) :: t()
54
  def new(fields) do
55
    mode = if is_nil(fields[:pending_request_body]), do: :receiving, else: :pending
137✔
56
    fields = Keyword.put(fields, :mode, mode)
137✔
57
    struct!(__MODULE__, fields)
137✔
58
  end
59

60
  @spec put_response(t(), :done | {atom(), any()}) :: :pop | {t(), t()}
61
  def put_response(request, response) do
62
    request
63
    |> struct!(buffer: [response | request.buffer])
594✔
64
    |> update_mode(response)
65
    |> send_response()
66
    |> maybe_terminate_request()
594✔
67
  end
68

69
  @spec recv(t(), GenServer.from()) :: :pop | {t(), t()}
70
  def recv(request, from) do
71
    request
72
    |> struct!(stream_to: {:reply, from})
73
    |> send_response()
74
    |> maybe_terminate_request()
395✔
75
  end
76

77
  @spec update_mode(t(), :done | {atom(), term()}) :: t()
78
  defp update_mode(%__MODULE__{mode: mode} = request, _) when mode != :receiving, do: request
×
79

80
  defp update_mode(request, {:close, _}) do
81
    struct!(request, mode: :closing)
1✔
82
  end
83

84
  defp update_mode(request, :done) do
85
    struct!(request, mode: :terminating)
136✔
86
  end
87

88
  defp update_mode(request, _), do: request
457✔
89

90
  @spec maybe_terminate_request(t()) :: {t(), t()} | :pop
91
  def maybe_terminate_request(%__MODULE__{mode: :closing, buffer: []}), do: :pop
1✔
92

93
  def maybe_terminate_request(%__MODULE__{mode: :terminating, buffer: []} = request) do
94
    Process.demonitor(request.caller_ref)
136✔
95
    ConnectionRegistry.checkin(%{pool: request.pool, adapter: self()})
136✔
96
    :pop
97
  end
98

99
  def maybe_terminate_request(request), do: {request, request}
852✔
100

101
  @spec send_response(t()) :: t()
102
  defp send_response(%__MODULE__{stream_to: nil} = request) do
103
    request
391✔
104
  end
105

106
  defp send_response(%__MODULE__{stream_to: {:reply, from}, buffer: [_ | _]} = request) do
107
    GenServer.reply(from, Enum.reverse(request.buffer))
395✔
108
    struct!(request, stream_to: nil, buffer: [])
395✔
109
  end
110

111
  defp send_response(%__MODULE__{stream_to: {pid, ref}} = request) do
112
    request.buffer |> Enum.reverse() |> Enum.each(&send(pid, {ref, &1}))
203✔
113
    struct!(request, buffer: [])
203✔
114
  end
115

116
  defp send_response(%__MODULE__{stream_to: pid} = request) do
117
    request.buffer |> Enum.reverse() |> Enum.each(&send(pid, &1))
×
118
    struct!(request, buffer: [])
×
119
  end
120

121
  @spec map_response({:done, reference()} | {atom(), reference(), any}) ::
122
          {:done | {atom(), any}, reference()}
123
  def map_response({:done, ref}), do: {:done, ref}
136✔
124
  def map_response({type, ref, value}), do: {{type, value}, ref}
456✔
125

126
  @spec map_frame({:binary, binary} | {:close, any, any}) ::
127
          {:close, {integer(), binary()}}
128
          | {:error, binary}
129
          | {:stderr, binary}
130
          | {:stdout, binary}
131
  def map_frame({:close, code, reason}), do: {:close, {code, reason}}
1✔
132
  def map_frame({:binary, <<1, msg::binary>>}), do: {:stdout, msg}
1✔
133
  def map_frame({:binary, <<2, msg::binary>>}), do: {:stderr, msg}
×
134
  def map_frame({:binary, <<3, msg::binary>>}), do: {:error, msg}
×
135
  def map_frame({:binary, msg}), do: {:stdout, msg}
×
136

137
  @spec map_outgoing_frame({:stdin, binary()} | {:close, integer(), binary()} | :close | :exit) ::
138
          {:ok, :close | {:text, binary} | {:close, integer(), binary()}}
139
          | K8s.Client.HTTPError.t()
140
  def map_outgoing_frame({:stdin, data}), do: {:ok, {:text, <<0>> <> data}}
×
141
  def map_outgoing_frame(:close), do: {:ok, :close}
×
142
  def map_outgoing_frame(:exit), do: {:ok, :close}
×
143
  def map_outgoing_frame({:close, code, reason}), do: {:ok, {:close, code, reason}}
×
144

145
  def map_outgoing_frame(data) do
146
    K8s.Client.HTTPError.new(
×
147
      message: "The given message #{inspect(data)} is not supported to be sent to the Pod."
148
    )
149
  end
150

151
  @spec receive_upgrade_response(Mint.HTTP.t(), reference()) ::
152
          {:ok, Mint.HTTP.t(), map()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
153
  def receive_upgrade_response(conn, ref) do
154
    Enum.reduce_while(Stream.cycle([:ok]), {conn, %{}}, fn _, {conn, response} ->
2✔
155
      case Mint.HTTP.recv(conn, 0, 5000) do
2✔
156
        {:ok, conn, parts} ->
157
          response =
2✔
158
            parts
159
            |> Map.new(fn
160
              {type, ^ref} -> {type, true}
2✔
161
              {type, ^ref, value} -> {type, value}
6✔
162
            end)
163
            |> Map.merge(response)
164

165
          # credo:disable-for-lines:3
166
          if response[:done],
2✔
167
            do: {:halt, {:ok, conn, response}},
168
            else: {:cont, {conn, response}}
169

170
        {:error, conn, error, _} ->
×
171
          {:halt, {:error, conn, error}}
172
      end
173
    end)
174
  end
175

176
  @spec stream_request_body(t(), Mint.HTTP.t()) ::
177
          {:ok, t(), Mint.HTTP.t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
178
  def stream_request_body(%__MODULE__{mode: mode} = req, conn) when mode != :pending,
179
    do: {:ok, req, conn}
379✔
180

181
  def stream_request_body(request, conn) do
182
    chunk_size = chunk_size(request, conn)
7✔
183

184
    %__MODULE__{
185
      request_ref: request_ref,
186
      pending_request_body: <<chunk::binary-size(chunk_size), remaining_request_body::binary>>
187
    } = request
7✔
188

189
    with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, chunk),
7✔
190
         {:remaining_request_body, conn, ""} <-
7✔
191
           {:remaining_request_body, conn, remaining_request_body},
192
         {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, :eof) do
7✔
193
      {:ok, struct!(request, mode: :receiving, pending_request_body: nil), conn}
7✔
194
    else
195
      {:remaining_request_body, conn, remaining_request_body} ->
196
        {:ok, struct!(request, pending_request_body: remaining_request_body), conn}
×
197

198
      {:error, conn, error} ->
199
        {:error, conn, error}
×
200
    end
201
  end
202

203
  @spec chunk_size(t(), Mint.HTTP.t()) :: non_neg_integer()
204
  defp chunk_size(request, conn) do
205
    Enum.min([
7✔
206
      get_window_size(conn, {:request, request.request_ref}),
7✔
207
      get_window_size(conn, :connection),
208
      byte_size(request.pending_request_body)
7✔
209
    ])
210
  end
211

212
  @spec get_window_size(
213
          Mint.HTTP.t() | Mint.HTTP2.t(),
214
          :connection | {:request, Mint.Types.request_ref()}
215
        ) :: non_neg_integer
216
  defp get_window_size(conn, connection_or_request) do
217
    # This is necessary becuase conn types in Mint are opaque and dialyzer
218
    # would raise if we call Mint.HTTP2.get_window_size() directly with a
219
    # Mint.HTTP.t() conn.
220
    #
221
    # See [elixir-mint/mint#380](https://github.com/elixir-mint/mint/issues/380) for
222
    # the discussion on it.
223

224
    Mint.HTTP2.get_window_size(conn, connection_or_request)
14✔
225
  end
226
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc