• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / f1f00ddb770b1eff4e47a0957ac3cb0ce13ea930-PR-1574

14 Oct 2025 11:17PM UTC coverage: 85.624% (-0.3%) from 85.908%
f1f00ddb770b1eff4e47a0957ac3cb0ce13ea930-PR-1574

Pull #1574

github

filipecabaco
move test out of integration testing
Pull Request #1574: fix: validate size of track message

8 of 9 new or added lines in 2 files covered. (88.89%)

7 existing lines in 2 files now uncovered.

2156 of 2518 relevant lines covered (85.62%)

2670.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/lib/realtime/monitoring/latency.ex
1
defmodule Realtime.Latency do
2
  @moduledoc """
3
    Measures the latency of the cluster from each node and broadcasts it over PubSub.
4
  """
5

6
  use GenServer
7
  use Realtime.Logs
8

9
  alias Realtime.Nodes
10
  alias Realtime.GenRpc
11

12
  defmodule Payload do
13
    @moduledoc false
14

15
    defstruct [
16
      :from_node,
17
      :from_region,
18
      :node,
19
      :region,
20
      :latency,
21
      :response,
22
      :timestamp
23
    ]
24

25
    @type t :: %__MODULE__{
26
            node: atom(),
27
            region: String.t() | nil,
28
            from_node: atom(),
29
            from_region: String.t(),
30
            latency: integer(),
31
            response: {:ok, :pong} | {:badrpc, any()},
32
            timestamp: DateTime
33
          }
34
  end
35

36
  @every 15_000
37
  def start_link(args) do
38
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
×
39
  end
40

41
  def init(_args) do
42
    ping_after()
×
43

44
    {:ok, []}
45
  end
46

47
  def handle_info(:ping, state) do
48
    ping()
30✔
49
    ping_after()
30✔
50

51
    {:noreply, state}
52
  end
53

54
  def handle_info(msg, state) do
55
    Logger.warning("Unexpected message: #{inspect(msg)}")
×
56
    {:noreply, state}
57
  end
58

59
  def handle_cast({:ping, pong_timeout, timer_timeout, yield_timeout}, state) do
60
    # For testing
61
    ping(pong_timeout, timer_timeout, yield_timeout)
×
62

63
    {:noreply, state}
64
  end
65

66
  @doc """
67
  Pings all the nodes in the cluster one after another and returns with their responses.
68
  There is a timeout for a single node rpc, and a timeout to yield_many which should really
69
  never get hit because these pings happen async under the Realtime.TaskSupervisor.
70
  """
71

72
  @spec ping :: [{Task.t(), tuple() | nil}]
73
  def ping(pong_timeout \\ 0, timer_timeout \\ 5_000, yield_timeout \\ 5_000) do
74
    tasks =
30✔
75
      for n <- [Node.self() | Node.list()] do
76
        Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
33✔
77
          {latency, response} =
33✔
78
            :timer.tc(fn ->
79
              GenRpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
33✔
80
            end)
81

82
          latency_ms = latency / 1_000
33✔
83
          region = Application.get_env(:realtime, :region, "not_set")
33✔
84
          short_name = Nodes.short_node_id_from_name(n)
33✔
85
          from_node = Nodes.short_node_id_from_name(Node.self())
33✔
86

87
          case response do
33✔
88
            {:error, :rpc_error, reason} ->
UNCOV
89
              log_error(
×
90
                "RealtimeNodeDisconnected",
UNCOV
91
                "Unable to connect to #{short_name} from #{region}: #{reason}"
×
92
              )
93

UNCOV
94
              payload = %Payload{
×
95
                from_node: from_node,
96
                from_region: region,
97
                node: short_name,
98
                region: nil,
99
                latency: latency_ms,
100
                response: response,
101
                timestamp: DateTime.utc_now()
102
              }
103

UNCOV
104
              RealtimeWeb.Endpoint.broadcast("admin:cluster", "pong", payload)
×
105

UNCOV
106
              payload
×
107

108
            {:ok, {:pong, remote_region}} ->
109
              if latency_ms > 1_000,
33✔
110
                do:
111
                  Logger.warning(
×
112
                    "Network warning: latency to #{remote_region} (#{short_name}) from #{region} (#{from_node}) is #{latency_ms} ms"
×
113
                  )
114

115
              payload = %Payload{
33✔
116
                from_node: from_node,
117
                from_region: region,
118
                node: short_name,
119
                region: remote_region,
120
                latency: latency_ms,
121
                response: response,
122
                timestamp: DateTime.utc_now()
123
              }
124

125
              RealtimeWeb.Endpoint.broadcast("admin:cluster", "pong", payload)
33✔
126

127
              payload
33✔
128
          end
129
        end)
130
      end
131
      |> Task.yield_many(yield_timeout)
132

133
    for {task, result} <- tasks do
30✔
134
      unless result, do: Task.shutdown(task, :brutal_kill)
33✔
135
    end
136

137
    tasks
30✔
138
  end
139

140
  @doc """
141
  A noop function to call from a remote server.
142
  """
143

144
  @spec pong :: {:ok, {:pong, String.t()}}
145
  def pong do
146
    region = Application.get_env(:realtime, :region, "not_set")
31✔
147
    {:ok, {:pong, region}}
148
  end
149

150
  @spec pong(:infinity | non_neg_integer) :: {:ok, {:pong, String.t()}}
151
  def pong(latency) when is_integer(latency) do
152
    Process.sleep(latency)
31✔
153
    pong()
31✔
154
  end
155

156
  defp ping_after do
157
    Process.send_after(self(), :ping, @every)
30✔
158
  end
159
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc