• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

efcasado / pulsar-elixir / 6aa583728d53acb6ff28bea829dfcf551de67e4b-PR-68

29 Nov 2025 09:39AM UTC coverage: 69.694%. First build
6aa583728d53acb6ff28bea829dfcf551de67e4b-PR-68

Pull #68

github

efcasado
update client reliability tests
Pull Request #68: rework tests

16 of 19 new or added lines in 4 files covered. (84.21%)

729 of 1046 relevant lines covered (69.69%)

72.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.77
/test/support/system.ex
1
defmodule Pulsar.Test.Support.System do
2
  @moduledoc false
3
  alias Pulsar.Test.Support.Utils
4

5
  require Logger
6

7
  @brokers [
8
    %{
9
      container: "broker1",
10
      host: "broker1",
11
      web_port: 8080,
12
      service_port: 6650,
13
      admin_url: "http://broker1:8080",
14
      service_url: "pulsar://broker1:6650",
15
      health_url: "http://broker1:8080/admin/v2/brokers/health"
16
    },
17
    %{
18
      container: "broker2",
19
      host: "broker2",
20
      web_port: 8081,
21
      service_port: 6651,
22
      admin_url: "http://broker2:8081",
23
      service_url: "pulsar://broker2:6651",
24
      health_url: "http://broker2:8081/admin/v2/brokers/health"
25
    }
26
  ]
27

28
  def broker do
29
    Enum.random(@brokers)
29✔
30
  end
31

32
  def brokers do
×
33
    @brokers
34
  end
35

36
  def kill_broker(%{container: container} = _broker) do
37
    {_out, 0} = System.cmd("docker", ["kill", container], stderr_to_stdout: true)
×
38
    :ok
39
  end
40

41
  def broker_for_consumer(consumer, client \\ :default) when is_pid(consumer) do
42
    Enum.find(
2✔
43
      @brokers,
44
      nil,
45
      fn broker ->
46
        broker.service_url
4✔
47
        |> Pulsar.Broker.get_consumers(client: client)
48
        |> Enum.any?(fn {_id, pid} -> pid == consumer end)
4✔
49
      end
50
    )
51
  end
52

53
  def broker_for_producer(producer, client \\ :default) when is_pid(producer) do
54
    Enum.find(
3✔
55
      @brokers,
56
      nil,
57
      fn broker ->
58
        broker.service_url
6✔
59
        |> Pulsar.Broker.get_producers(client: client)
60
        |> Enum.any?(fn {_id, pid} -> pid == producer end)
6✔
61
      end
62
    )
63
  end
64

65
  def consumers_on(broker_url, client \\ :default) when is_binary(broker_url) do
NEW
66
    Pulsar.Broker.get_consumers(broker_url, client: client)
×
67
  end
68

69
  def start_pulsar do
70
    Logger.info("Starting Pulsar ...")
1✔
71
    {_output, 0} = System.cmd("docker", ["compose", "up", "-d"], stderr_to_stdout: true)
1✔
72

73
    :ok = Utils.wait_for(&brokers_up?/0, _attempts = 100, _delay = 100)
1✔
74
  end
75

76
  def stop_pulsar do
77
    Logger.info("Stopping Pulsar ...")
1✔
78
    {_output, 0} = System.cmd("docker", ["compose", "down"], stderr_to_stdout: true)
1✔
79
    :ok
80
  end
81

82
  def create_namespace(namespace) do
83
    broker = broker()
×
84

85
    command = [
×
86
      "bin/pulsar-admin",
87
      "--admin-url",
88
      broker.admin_url,
×
89
      "namespaces",
90
      "create",
91
      namespace
92
    ]
93

94
    {_, 0} = docker_exec(command)
×
95
    :ok
96
  end
97

98
  def create_topic(topic, partitions \\ 0)
2✔
99

100
  def create_topic(topic, 0) do
101
    broker = broker()
2✔
102
    command = ["bin/pulsar-admin", "--admin-url", broker.admin_url, "topics", "create", topic]
2✔
103

104
    {_, 0} = docker_exec(command)
2✔
105
    :ok
106
  end
107

108
  def create_topic(topic, n) do
109
    broker = broker()
1✔
110

111
    command = [
1✔
112
      "bin/pulsar-admin",
113
      "--admin-url",
114
      broker.admin_url,
1✔
115
      "topics",
116
      "create-partitioned-topic",
117
      topic,
118
      "--partitions",
119
      "#{n}"
1✔
120
    ]
121

122
    {_, 0} = docker_exec(command)
1✔
123
    :ok
124
  end
125

126
  def unload_topic(topic) do
127
    broker = broker()
1✔
128
    command = ["bin/pulsar-admin", "--admin-url", broker.admin_url, "topics", "unload", topic]
1✔
129

130
    {_, 0} = docker_exec(command)
1✔
131
    :ok
132
  end
133

134
  def produce_messages(topic, messages, broker \\ broker()) do
135
    base_cmd = [
1✔
136
      "bin/pulsar-client",
137
      "--url",
138
      broker.service_url,
1✔
139
      "produce",
140
      topic
141
    ]
142

143
    Enum.each(messages, fn
1✔
144
      {key, message} ->
145
        {_, 0} = docker_exec(base_cmd ++ ["-m", message, "-k", key])
6✔
146

147
      message when is_binary(message) ->
148
        {_, 0} = docker_exec(base_cmd ++ ["-m", message])
×
149
    end)
150

151
    :ok
152
  end
153

154
  def topic_subscriptions(topic, broker \\ broker()) do
155
    command = [
2✔
156
      "bin/pulsar-admin",
157
      "--admin-url",
158
      broker.admin_url,
2✔
159
      "topics",
160
      "subscriptions",
161
      topic
162
    ]
163

164
    case docker_exec(broker.container, command) do
2✔
165
      {raw_subscriptions, 0} ->
166
        subscriptions = String.split(raw_subscriptions)
2✔
167

168
        {:ok, subscriptions}
169

170
      {error_output, exit_code} ->
×
171
        {:error, %{exit_code: exit_code, message: error_output}}
172
    end
173
  end
174

175
  def list_topics(namespace \\ "public/default", broker \\ broker()) do
176
    command = [
1✔
177
      "bin/pulsar-admin",
178
      "--admin-url",
179
      broker.admin_url,
1✔
180
      "topics",
181
      "list",
182
      namespace
183
    ]
184

185
    case docker_exec(broker.container, command) do
1✔
186
      {raw_topics, 0} ->
187
        topics =
1✔
188
          raw_topics
189
          |> String.split("\n", trim: true)
190
          |> Enum.map(&String.trim/1)
191
          |> Enum.reject(&(&1 == ""))
6✔
192

193
        {:ok, topics}
194

195
      {error_output, exit_code} ->
×
196
        {:error, %{exit_code: exit_code, message: error_output}}
197
    end
198
  end
199

200
  defp docker_exec(command) do
201
    broker = broker()
10✔
202

203
    docker_exec(broker.container, command)
10✔
204
  end
205

206
  defp docker_exec(container, command) do
207
    System.cmd("docker", ["exec", container | command], stderr_to_stdout: true)
13✔
208
  end
209

210
  defp brokers_up? do
211
    Enum.all?(@brokers, &broker_up?(&1))
86✔
212
  end
213

214
  defp broker_up?(broker) do
215
    {"ok", 0} == System.cmd("curl", ["-s", broker.health_url])
87✔
216
  end
217
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc