• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / c5c8f617a3ad63a2c3506a1b61ecd513ebad1ed3

07 May 2025 08:44PM UTC coverage: 82.461% (+0.4%) from 82.079%
c5c8f617a3ad63a2c3506a1b61ecd513ebad1ed3

push

github

web-flow
fix: remove region from syn conflict handling & non found process on register_process (#1363)

7 of 10 new or added lines in 4 files covered. (70.0%)

8 existing lines in 3 files now uncovered.

1749 of 2121 relevant lines covered (82.46%)

1349.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/lib/realtime/monitoring/latency.ex
1
defmodule Realtime.Latency do
2
  @moduledoc """
3
    Measures the latency of the cluster from each node and broadcasts it over PubSub.
4
  """
5

6
  use GenServer
7
  require Logger
8
  import Realtime.Logs
9

10
  alias Realtime.Nodes
11
  alias Realtime.Rpc
12

13
  defmodule Payload do
14
    @moduledoc false
15

16
    defstruct [
17
      :from_node,
18
      :from_region,
19
      :node,
20
      :region,
21
      :latency,
22
      :response,
23
      :timestamp
24
    ]
25

26
    @type t :: %__MODULE__{
27
            node: atom(),
28
            region: String.t() | nil,
29
            from_node: atom(),
30
            from_region: String.t(),
31
            latency: integer(),
32
            response: {:ok, :pong} | {:badrpc, any()},
33
            timestamp: DateTime
34
          }
35
  end
36

37
  @every 5_000
38
  def start_link(args) do
39
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
×
40
  end
41

42
  def init(_args) do
43
    ping_after()
×
44

45
    {:ok, []}
46
  end
47

48
  def handle_info(:ping, state) do
49
    ping()
147✔
50
    ping_after()
147✔
51

52
    {:noreply, state}
53
  end
54

55
  def handle_info(msg, state) do
56
    Logger.warning("Unexpected message: #{inspect(msg)}")
×
57
    {:noreply, state}
58
  end
59

60
  def handle_cast({:ping, pong_timeout, timer_timeout, yield_timeout}, state) do
61
    # For testing
62
    ping(pong_timeout, timer_timeout, yield_timeout)
×
63

64
    {:noreply, state}
65
  end
66

67
  @doc """
68
  Pings all the nodes in the cluster one after another and returns with their responses.
69
  There is a timeout for a single node rpc, and a timeout to yield_many which should really
70
  never get hit because these pings happen async under the Realtime.TaskSupervisor.
71
  """
72

73
  @spec ping :: [{Task.t(), tuple() | nil}]
74
  def ping(pong_timeout \\ 0, timer_timeout \\ 5_000, yield_timeout \\ 5_000) do
75
    tasks =
147✔
76
      for n <- [Node.self() | Node.list()] do
77
        Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
152✔
78
          {latency, response} =
152✔
79
            :timer.tc(fn ->
80
              Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
152✔
81
            end)
82

83
          latency_ms = latency / 1_000
152✔
84
          region = Application.get_env(:realtime, :region, "not_set")
152✔
85
          short_name = Nodes.short_node_id_from_name(n)
152✔
86
          from_node = Nodes.short_node_id_from_name(Node.self())
152✔
87

88
          case response do
152✔
89
            {:badrpc, reason} ->
UNCOV
90
              log_error(
×
91
                "RealtimeNodeDisconnected",
UNCOV
92
                "Unable to connect to #{short_name} from #{region}: #{reason}"
×
93
              )
94

UNCOV
95
              payload = %Payload{
×
96
                from_node: from_node,
97
                from_region: region,
98
                node: short_name,
99
                region: nil,
100
                latency: latency_ms,
101
                response: response,
102
                timestamp: DateTime.utc_now()
103
              }
104

UNCOV
105
              RealtimeWeb.Endpoint.broadcast("admin:cluster", "pong", payload)
×
106

UNCOV
107
              payload
×
108

109
            {:ok, {:pong, remote_region}} ->
110
              if latency_ms > 1_000,
152✔
111
                do:
112
                  Logger.warning(
×
113
                    "Network warning: latency to #{remote_region} (#{short_name}) from #{region} (#{from_node}) is #{latency_ms} ms"
×
114
                  )
115

116
              payload = %Payload{
152✔
117
                from_node: from_node,
118
                from_region: region,
119
                node: short_name,
120
                region: remote_region,
121
                latency: latency_ms,
122
                response: response,
123
                timestamp: DateTime.utc_now()
124
              }
125

126
              RealtimeWeb.Endpoint.broadcast("admin:cluster", "pong", payload)
152✔
127

128
              payload
152✔
129
          end
130
        end)
131
      end
132
      |> Task.yield_many(yield_timeout)
133

134
    for {task, result} <- tasks do
147✔
135
      unless result, do: Task.shutdown(task, :brutal_kill)
152✔
136
    end
137

138
    tasks
147✔
139
  end
140

141
  @doc """
142
  A noop function to call from a remote server.
143
  """
144

145
  @spec pong :: {:ok, {:pong, String.t()}}
146
  def pong do
147
    region = Application.get_env(:realtime, :region, "not_set")
147✔
148
    {:ok, {:pong, region}}
149
  end
150

151
  @spec pong(:infinity | non_neg_integer) :: {:ok, {:pong, String.t()}}
152
  def pong(latency) when is_integer(latency) do
153
    Process.sleep(latency)
147✔
154
    pong()
147✔
155
  end
156

157
  defp ping_after do
158
    Process.send_after(self(), :ping, @every)
147✔
159
  end
160
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc