• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 5fb0cae9cf21cf22cb84bb0c6cc9d393d881e126-PR-1357

02 May 2025 09:03AM UTC coverage: 81.732% (-0.05%) from 81.779%
5fb0cae9cf21cf22cb84bb0c6cc9d393d881e126-PR-1357

Pull #1357

github

edgurgel
feat: add make help

Based on https://gist.github.com/prwhite/8168133
Pull Request #1357: feat: add make help

1727 of 2113 relevant lines covered (81.73%)

745.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/lib/realtime/monitoring/latency.ex
1
defmodule Realtime.Latency do
2
  @moduledoc """
3
    Measures the latency of the cluster from each node and broadcasts it over PubSub.
4
  """
5

6
  use GenServer
7
  require Logger
8
  import Realtime.Logs
9

10
  alias Realtime.Nodes
11
  alias Realtime.Rpc
12

13
  defmodule Payload do
14
    @moduledoc false
15

16
    defstruct [
17
      :from_node,
18
      :from_region,
19
      :node,
20
      :region,
21
      :latency,
22
      :response,
23
      :timestamp
24
    ]
25

26
    @type t :: %__MODULE__{
27
            node: atom(),
28
            region: String.t() | nil,
29
            from_node: atom(),
30
            from_region: String.t(),
31
            latency: integer(),
32
            response: {:ok, :pong} | {:badrpc, any()},
33
            timestamp: DateTime
34
          }
35
  end
36

37
  @every 5_000
38
  def start_link(args) do
39
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
×
40
  end
41

42
  def init(_args) do
43
    ping_after()
×
44

45
    {:ok, []}
46
  end
47

48
  def handle_info(:ping, state) do
49
    ping()
149✔
50
    ping_after()
149✔
51

52
    {:noreply, state}
53
  end
54

55
  def handle_info(msg, state) do
56
    Logger.warning("Unexpected message: #{inspect(msg)}")
×
57
    {:noreply, state}
58
  end
59

60
  def handle_cast({:ping, pong_timeout, timer_timeout, yield_timeout}, state) do
61
    # For testing
62
    ping(pong_timeout, timer_timeout, yield_timeout)
×
63

64
    {:noreply, state}
65
  end
66

67
  @doc """
68
  Pings all the nodes in the cluster one after another and returns with their responses.
69
  There is a timeout for a single node rpc, and a timeout to yield_many which should really
70
  never get hit because these pings happen async under the Realtime.TaskSupervisor.
71
  """
72

73
  @spec ping :: [{Task.t(), tuple() | nil}]
74
  def ping(pong_timeout \\ 0, timer_timeout \\ 5_000, yield_timeout \\ 5_000) do
75
    tasks =
149✔
76
      for n <- [Node.self() | Node.list()] do
77
        Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
157✔
78
          {latency, response} =
157✔
79
            :timer.tc(fn ->
80
              Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
157✔
81
            end)
82

83
          latency_ms = latency / 1_000
157✔
84
          region = Application.get_env(:realtime, :region, "not_set")
157✔
85
          short_name = Nodes.short_node_id_from_name(n)
157✔
86
          from_node = Nodes.short_node_id_from_name(Node.self())
157✔
87

88
          case response do
157✔
89
            {:badrpc, reason} ->
90
              log_error(
×
91
                "RealtimeNodeDisconnected",
92
                "Unable to connect to #{short_name} from #{region}: #{reason}"
×
93
              )
94

95
              payload = %Payload{
×
96
                from_node: from_node,
97
                from_region: region,
98
                node: short_name,
99
                region: nil,
100
                latency: latency_ms,
101
                response: response,
102
                timestamp: DateTime.utc_now()
103
              }
104

105
              RealtimeWeb.Endpoint.broadcast("admin:cluster", "pong", payload)
×
106

107
              payload
×
108

109
            {:ok, {:pong, remote_region}} ->
110
              if latency_ms > 1_000,
157✔
111
                do:
112
                  Logger.warning(
×
113
                    "Network warning: latency to #{remote_region} (#{short_name}) from #{region} (#{from_node}) is #{latency_ms} ms"
×
114
                  )
115

116
              payload = %Payload{
157✔
117
                from_node: from_node,
118
                from_region: region,
119
                node: short_name,
120
                region: remote_region,
121
                latency: latency_ms,
122
                response: response,
123
                timestamp: DateTime.utc_now()
124
              }
125

126
              RealtimeWeb.Endpoint.broadcast("admin:cluster", "pong", payload)
157✔
127

128
              payload
157✔
129
          end
130
        end)
131
      end
132
      |> Task.yield_many(yield_timeout)
133

134
    for {task, result} <- tasks do
149✔
135
      unless result, do: Task.shutdown(task, :brutal_kill)
157✔
136
    end
137

138
    tasks
149✔
139
  end
140

141
  @doc """
142
  A noop function to call from a remote server.
143
  """
144

145
  @spec pong :: {:ok, {:pong, String.t()}}
146
  def pong do
147
    region = Application.get_env(:realtime, :region, "not_set")
149✔
148
    {:ok, {:pong, region}}
149
  end
150

151
  @spec pong(:infinity | non_neg_integer) :: {:ok, {:pong, String.t()}}
152
  def pong(latency) when is_integer(latency) do
153
    Process.sleep(latency)
149✔
154
    pong()
149✔
155
  end
156

157
  defp ping_after do
158
    Process.send_after(self(), :ping, @every)
149✔
159
  end
160
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc