• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 6a1d77a5420203dcc840c17c888d82dff37f69e6

10 Dec 2025 03:40AM UTC coverage: 80.087% (-7.8%) from 87.901%
6a1d77a5420203dcc840c17c888d82dff37f69e6

push

github

web-flow
fix: improve metrics handling (#1654)

* Allow 3x more max heap size than other processes
* Avoid compressing given that gen_rpc does this for us already
* fix: remove limit_concurrent metric
* fix: revert the way we fetched tenants to report on connected metric

2 of 18 new or added lines in 3 files covered. (11.11%)

203 existing lines in 24 files now uncovered.

2200 of 2747 relevant lines covered (80.09%)

2956.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.63
/lib/realtime/syn_handler.ex
1
defmodule Realtime.SynHandler do
2
  @moduledoc """
3
  Custom defined Syn's callbacks
4
  """
5
  require Logger
6
  alias Realtime.Syn.PostgresCdc
7
  alias Realtime.Tenants.Connect
8
  alias RealtimeWeb.Endpoint
9

10
  @behaviour :syn_event_handler
11

12
  @postgres_cdc_scope_prefix PostgresCdc.syn_topic_prefix()
13

14
  @impl true
15
  def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
16
    # Update that a database connection is ready
17
    Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
281✔
18
  end
19

20
  def on_registry_process_updated(scope, tenant_id, _pid, meta, _reason) do
21
    scope = Atom.to_string(scope)
40✔
22

23
    case scope do
40✔
24
      @postgres_cdc_scope_prefix <> _ ->
25
        Endpoint.local_broadcast(PostgresCdc.syn_topic(tenant_id), "ready", meta)
32✔
26

27
      _ ->
8✔
28
        :ok
29
    end
30
  end
31

32
  @doc """
33
  When processes registered with :syn are unregistered, either manually or by stopping, this
34
  callback is invoked.
35

36
  Other processes can subscribe to these events via PubSub to respond to them.
37

38
  We want to log conflict resolutions to know when more than one process on the cluster
39
  was started, and subsequently stopped because :syn handled the conflict.
40
  """
41
  @impl true
42
  def on_process_unregistered(scope, name, pid, _meta, reason) do
43
    case Atom.to_string(scope) do
333✔
44
      @postgres_cdc_scope_prefix <> _ = scope ->
45
        Endpoint.local_broadcast(PostgresCdc.syn_topic(name), scope <> "_down", %{pid: pid, reason: reason})
49✔
46

47
      _ ->
48
        topic = topic(scope)
284✔
49
        Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})
284✔
50
    end
51

52
    if reason == :syn_conflict_resolution,
333✔
53
      do: log("#{scope} terminated due to syn conflict resolution: #{inspect(name)} #{inspect(pid)}")
1✔
54

55
    :ok
56
  end
57

58
  @doc """
59
  We try to keep the oldest process. If the time they were registered is exactly the same we use
60
  their node names to decide.
61

62
  The most important part is that both nodes must 100% of the time agree on the decision
63

64
  We first send an exit with reason {:shutdown, :syn_conflict_resolution}
65
  If it times out an exit with reason :kill that can't be trapped
66
  """
67
  @impl true
68
  def resolve_registry_conflict(mod, name, {pid1, _meta1, _time1}, {pid2, _meta2, _time2}) do
UNCOV
69
    {pid_to_keep, pid_to_stop} = decide(pid1, pid2, name)
×
70

71
    # Is this function running on the node that should stop?
UNCOV
72
    if node(pid_to_stop) == node() do
×
UNCOV
73
      log(
×
UNCOV
74
        "Resolving conflict on scope #{inspect(mod)} for name #{inspect(name)} {#{node(pid1)}, #{inspect(pid1)}} vs {#{node(pid2)}, #{inspect(pid2)}}, stop local process: #{inspect(pid_to_stop)}"
×
75
      )
76

UNCOV
77
      stop(pid_to_stop)
×
78
    else
UNCOV
79
      log(
×
UNCOV
80
        "Resolving conflict on scope #{inspect(mod)} for name #{inspect(name)} {#{node(pid1)}, #{inspect(pid1)}} vs {#{node(pid2)}, #{inspect(pid2)}}, remote process will be stopped: #{inspect(pid_to_stop)}"
×
81
      )
82
    end
83

UNCOV
84
    pid_to_keep
×
85
  end
86

87
  defp stop(pid_to_stop) do
UNCOV
88
    spawn(fn ->
×
UNCOV
89
      Process.monitor(pid_to_stop)
×
UNCOV
90
      Process.exit(pid_to_stop, {:shutdown, :syn_conflict_resolution})
×
91

UNCOV
92
      receive do
×
93
        {:DOWN, _ref, :process, ^pid_to_stop, reason} ->
UNCOV
94
          log("Successfully stopped #{inspect(pid_to_stop)}. Reason: #{inspect(reason)}")
×
95
      after
96
        5000 ->
UNCOV
97
          log("Timed out while waiting for process #{inspect(pid_to_stop)} to stop. Sending kill exit signal")
×
UNCOV
98
          Process.exit(pid_to_stop, :kill)
×
99
      end
100
    end)
101
  end
102

103
  defp log(message), do: Logger.warning("SynHandler(#{node()}): #{message}")
1✔
104

105
  # We use node and the name to decide who lives and who dies
106
  # This way both nodes will always agree on the same outcome
107
  # regardless of timing issues
108
  defp decide(pid1, pid2, name) do
109
    # We hash the name to not always pick one specific node when a conflict happens
110
    # between these 2 nodes
UNCOV
111
    hash = :erlang.phash2(name, 2)
×
112

UNCOV
113
    if hash == 1 do
×
UNCOV
114
      if node(pid1) < node(pid2) do
×
115
        {pid1, pid2}
116
      else
117
        {pid2, pid1}
118
      end
119
    else
UNCOV
120
      if node(pid1) < node(pid2) do
×
121
        {pid2, pid1}
122
      else
123
        {pid1, pid2}
124
      end
125
    end
126
  end
127

128
  defp topic(mod) do
129
    mod
130
    |> Macro.underscore()
131
    |> String.split("/")
132
    |> Enum.take(-1)
133
    |> hd()
284✔
134
  end
135
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc