• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

efcasado / pulsar-elixir / 729c8fcc86e8332be7ec95e9887459eee296dac6

15 Dec 2025 08:11PM UTC coverage: 79.276% (+1.5%) from 77.77%
729c8fcc86e8332be7ec95e9887459eee296dac6

push

github

web-flow
feat: batching for producers (#96)

58 of 68 new or added lines in 3 files covered. (85.29%)

1 existing line in 1 file now uncovered.

1117 of 1409 relevant lines covered (79.28%)

135.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/test/support/utils.ex
1
defmodule Pulsar.Test.Support.Utils do
2
  @moduledoc false
3
  def wait_for(_fun, attempts \\ 100, interval_ms \\ 100)
75✔
4

5
  def wait_for(_fun, 0, _interval_ms) do
2✔
6
    :error
7
  end
8

9
  def wait_for(fun, attempts, interval_ms) do
10
    if fun.() do
450✔
11
      :ok
12
    else
13
      Process.sleep(interval_ms)
376✔
14
      wait_for(fun, attempts - 1, interval_ms)
376✔
15
    end
16
  end
17

18
  @doc """
19
  Collects flow control telemetry events and returns aggregated statistics.
20
  Returns a map with statistics grouped by consumer_id.
21
  """
22
  def collect_flow_stats do
23
    [:pulsar, :consumer, :flow_control, :stop]
24
    |> collect_events()
25
    |> aggregate_flow_stats()
2✔
26
  end
27

28
  @doc """
29
  Collects lookup telemetry events and returns aggregated statistics.
30
  Returns a map with total, success, and failure counts.
31
  Filters by client (defaults to :default).
32
  """
33
  def collect_lookup_stats(opts \\ []) do
34
    client = Keyword.get(opts, :client, :default)
2✔
35

36
    [:pulsar, :service_discovery, :lookup_topic, :stop]
37
    |> collect_events()
38
    |> filter_by_client(client)
39
    |> aggregate_success_stats()
2✔
40
  end
41

42
  @doc """
43
  Collects producer opened telemetry events and returns aggregated statistics.
44
  Returns a map with total, success, and failure counts.
45

46
  ## Options
47
    - `:producer_names` - list of producer names to filter by (optional)
48
  """
49
  def collect_producer_opened_stats(opts \\ []) do
50
    [:pulsar, :producer, :opened, :stop]
51
    |> collect_events(opts)
52
    |> aggregate_success_stats()
1✔
53
  end
54

55
  @doc """
56
  Collects producer closed telemetry events and returns aggregated statistics.
57
  Returns a map with total, success, and failure counts.
58

59
  ## Options
60
    - `:producer_names` - list of producer names to filter by (optional)
61
  """
62
  def collect_producer_closed_stats(opts \\ []) do
63
    [:pulsar, :producer, :closed, :stop]
64
    |> collect_events(opts)
65
    |> aggregate_success_stats()
1✔
66
  end
67

68
  @doc """
69
  Collects message published telemetry events and returns aggregated statistics.
70
  Returns a map with total count.
71

72
  ## Options
73
    - `:producer_names` - list of producer names to filter by (optional)
74
  """
75
  def collect_message_published_stats(opts \\ []) do
76
    [:pulsar, :producer, :message, :published]
77
    |> collect_events(opts)
78
    |> then(fn events -> %{total_count: length(events)} end)
1✔
79
  end
80

81
  @doc """
82
  Collects all raw telemetry events for the given event name.
83
  Returns a list of events with merged measurements and metadata.
84

85
  ## Options
86
    - `:producer_names` - list of producer names to filter by (optional)
87
  """
88
  def collect_events(event_name, opts \\ []) do
89
    event_name
90
    |> do_collect_events([])
91
    |> filter_by_producer_names(opts)
12✔
92
  end
93

94
  defp aggregate_flow_stats(events) do
95
    events
96
    |> Enum.group_by(& &1.consumer_id)
27✔
97
    |> Map.new(fn {consumer_id, consumer_events} ->
2✔
98
      stats = %{
7✔
99
        consumer_id: consumer_id,
100
        event_count: length(consumer_events),
101
        requested_total: Enum.sum(Enum.map(consumer_events, & &1.permits_requested))
27✔
102
      }
103

104
      {consumer_id, stats}
105
    end)
106
  end
107

108
  defp filter_by_client(events, client) do
109
    Enum.filter(events, fn event ->
2✔
110
      Map.get(event, :client) == client
3✔
111
    end)
112
  end
113

114
  defp filter_by_producer_names(events, opts) do
115
    case Keyword.get(opts, :producer_names) do
12✔
116
      nil ->
117
        events
7✔
118

119
      names when is_list(names) ->
120
        Enum.filter(events, fn event ->
5✔
121
          Map.get(event, :producer_name) in names
13✔
122
        end)
123
    end
124
  end
125

126
  defp aggregate_success_stats(events) do
127
    %{
4✔
128
      total_count: length(events),
129
      success_count: Enum.count(events, &(&1.success == true)),
5✔
130
      failure_count: Enum.count(events, &(&1.success == false))
5✔
131
    }
132
  end
133

134
  defp do_collect_events(event_name, acc) do
135
    receive do
60✔
136
      {:telemetry_event,
137
       %{
138
         event: ^event_name,
139
         measurements: measurements,
140
         metadata: metadata
141
       }} ->
142
        event = Map.merge(measurements, metadata)
48✔
143
        do_collect_events(event_name, [event | acc])
48✔
144
    after
145
      0 -> Enum.reverse(acc)
12✔
146
    end
147
  end
148

149
  @doc """
150
  Waits for the producer to be ready.
151
  """
152
  def wait_for_producer_ready(group_pid) do
153
    wait_for(fn ->
4✔
154
      case Pulsar.get_producers(group_pid) do
4✔
155
        [p | _] -> :sys.get_state(p).ready == true
4✔
NEW
156
        _ -> false
×
157
      end
158
    end)
159
  end
160

161
  @doc """
162
  Waits for the specified number of consumers to be ready.
163

164
  Consumers are considered ready when they send a `{:consumer_ready, pid}` message.
165
  This is typically used with the DummyConsumer callback which notifies when initialized.
166

167
  Returns a list of consumer PIDs in the order they became ready.
168
  """
169
  def wait_for_consumer_ready(count, timeout \\ 5000) do
170
    import ExUnit.Assertions, only: [flunk: 1]
171

172
    Enum.map(1..count, fn _ ->
16✔
173
      receive do
23✔
174
        {:consumer_ready, pid} -> pid
23✔
175
      after
176
        timeout -> flunk("Timeout waiting for consumer to be ready")
×
177
      end
178
    end)
179
  end
180
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc