• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tlux / graphql_ws_client / d0689f07e24af8784d6e1056ab63715fd4f7e2f3

29 Mar 2024 05:17PM UTC coverage: 95.208% (-0.2%) from 95.44%
d0689f07e24af8784d6e1056ab63715fd4f7e2f3

push

github

tlux
fix: credo warning

298 of 313 relevant lines covered (95.21%)

47.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.11
/lib/graphql_ws_client/iterator.ex
1
defmodule GraphQLWSClient.Iterator do
2
  @moduledoc false
3

4
  use GenServer
5

6
  require Logger
7

8
  import GraphQLWSClient.FormatLog
9

10
  alias GraphQLWSClient.Event
11
  alias GraphQLWSClient.Iterator.{Opts, State}
12

13
  @type iterator :: GenServer.server()
14

15
  @spec start_link(Opts.valid()) :: GenServer.on_start()
16
  def start_link(opts) do
17
    GenServer.start_link(__MODULE__, opts)
30✔
18
  end
19

20
  @spec start(Opts.valid()) :: GenServer.on_start()
21
  def start(opts) do
22
    GenServer.start(__MODULE__, opts)
×
23
  end
24

25
  @spec open!(
26
          GraphQLWSClient.client(),
27
          GraphQLWSClient.query(),
28
          GraphQLWSClient.variables(),
29
          Keyword.t()
30
        ) :: iterator | no_return
31
  def open!(client, query, variables \\ %{}, opts \\ []) do
32
    opts =
21✔
33
      opts
34
      |> Keyword.merge(
35
        client: client,
36
        query: query,
37
        variables: variables
38
      )
39
      |> Opts.new()
40
      |> Opts.validate!()
41

42
    {:ok, iterator} = start_link(opts)
15✔
43
    iterator
15✔
44
  end
45

46
  @spec close(iterator) :: :ok
47
  def close(iterator) do
48
    GenServer.stop(iterator)
12✔
49
  end
50

51
  @spec next(iterator) :: {:ok, [any]} | {:error, Exception.t()} | :halt
52
  def next(iterator) do
53
    GenServer.call(iterator, :next, :infinity)
60✔
54
  end
55

56
  @spec child_spec(term) :: Supervisor.child_spec()
57
  def child_spec(opts) do
58
    %{
15✔
59
      id: __MODULE__,
60
      start: {__MODULE__, :start_link, [opts]}
61
    }
62
  end
63

64
  # Callbacks
65

66
  @impl true
67
  def init(%Opts{} = opts) do
68
    Process.flag(:trap_exit, true)
30✔
69

70
    case GraphQLWSClient.subscribe(opts.client, opts.query, opts.variables) do
30✔
71
      {:ok, subscription_id} ->
30✔
72
        {:ok,
73
         %State{
74
           buffer_size: opts.buffer_size,
30✔
75
           client: opts.client,
30✔
76
           monitor_ref: Process.monitor(opts.client),
30✔
77
           subscription_id: subscription_id
78
         }}
79

80
      {:error, error} ->
×
81
        {:stop, error}
82
    end
83
  end
84

85
  @impl true
86
  def terminate(_reason, %State{
87
        client: client,
88
        monitor_ref: monitor_ref,
89
        subscription_id: subscription_id
90
      }) do
91
    if monitor_ref do
30✔
92
      Process.demonitor(monitor_ref, [:flush])
30✔
93
    end
94

95
    if subscription_id do
30✔
96
      GraphQLWSClient.unsubscribe(client, subscription_id)
12✔
97
    end
98
  end
99

100
  @impl true
101
  def handle_call(:next, _from, %State{halted?: true, buffer: []} = state) do
102
    {:reply, :halt, state}
9✔
103
  end
104

105
  def handle_call(:next, from, %State{buffer: []} = state) do
45✔
106
    {:noreply, %{state | from: from}}
107
  end
108

109
  def handle_call(:next, _from, %State{} = state) do
110
    {:reply, {:ok, Enum.reverse(state.buffer)},
6✔
111
     %{state | buffer: [], from: nil}}
112
  end
113

114
  @impl true
115
  def handle_info(
3✔
116
        {:DOWN, ref, :process, _pid, _reason},
117
        %State{monitor_ref: ref} = state
118
      ) do
119
    {:noreply, halt_and_reply(state)}
120
  end
121

122
  def handle_info(%Event{type: :complete}, %State{} = state) do
9✔
123
    {:noreply, halt_and_reply(state)}
124
  end
125

126
  def handle_info(%Event{type: :error, payload: error}, %State{} = state) do
127
    Logger.error(format_log("Iteration halted: #{Exception.message(error)}"))
6✔
128
    GenServer.reply(state.from, {:error, error})
6✔
129
    {:noreply, halt(state)}
130
  end
131

132
  def handle_info(
133
        %Event{type: :next, payload: payload},
134
        %State{from: nil} = state
135
      ) do
136
    buffer = truncate_buffer([payload | state.buffer], state.buffer_size)
36✔
137
    {:noreply, %{state | buffer: buffer}}
138
  end
139

140
  def handle_info(%Event{type: :next, payload: payload}, %State{} = state) do
141
    GenServer.reply(state.from, {:ok, Enum.reverse([payload | state.buffer])})
30✔
142
    {:noreply, %{state | buffer: [], from: nil}}
143
  end
144

145
  # Helpers
146

147
  defp halt(state) do
148
    %{state | from: nil, halted?: true, subscription_id: nil}
18✔
149
  end
150

151
  defp halt_and_reply(state) do
152
    if state.from do
12✔
153
      reply =
6✔
154
        case state.buffer do
6✔
155
          [] -> :halt
6✔
156
          buffer -> {:ok, buffer}
×
157
        end
158

159
      GenServer.reply(state.from, reply)
6✔
160
    end
161

162
    halt(state)
12✔
163
  end
164

165
  defp truncate_buffer(list, :infinity), do: list
18✔
166

167
  defp truncate_buffer(list, size), do: Enum.take(list, size)
18✔
168
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc